OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Catalog/Catalog.h"
26 
27 #include <algorithm>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/range/adaptor/map.hpp>
31 #include <boost/version.hpp>
32 #include <cassert>
33 #include <cerrno>
34 #include <cstdio>
35 #include <cstring>
36 #include <exception>
37 #include <fstream>
38 #include <list>
39 #include <memory>
40 #include <random>
41 #include <regex>
42 #include <sstream>
43 
44 #if BOOST_VERSION >= 106600
45 #include <boost/uuid/detail/sha1.hpp>
46 #else
47 #include <boost/uuid/sha1.hpp>
48 #endif
49 #include <rapidjson/document.h>
50 #include <rapidjson/istreamwrapper.h>
51 #include <rapidjson/ostreamwrapper.h>
52 #include <rapidjson/writer.h>
53 
54 #include "Catalog/SysCatalog.h"
55 
56 #include "QueryEngine/Execute.h"
58 
64 #include "Fragmenter/Fragmenter.h"
66 #include "LockMgr/LockMgr.h"
69 #include "Parser/ParserNode.h"
70 #include "QueryEngine/Execute.h"
72 #include "RefreshTimeCalculator.h"
73 #include "Shared/DateTimeParser.h"
74 #include "Shared/File.h"
75 #include "Shared/StringTransform.h"
76 #include "Shared/SysDefinitions.h"
77 #include "Shared/measure.h"
78 #include "Shared/misc.h"
80 
81 #include "MapDRelease.h"
82 #include "RWLocks.h"
84 
85 #include "Shared/distributed.h"
86 
87 using Chunk_NS::Chunk;
90 using std::list;
91 using std::map;
92 using std::pair;
93 using std::runtime_error;
94 using std::string;
95 using std::vector;
96 
97 bool g_enable_fsi{true};
98 bool g_enable_s3_fsi{false};
101 extern bool g_cache_string_hash;
102 extern bool g_enable_system_tables;
103 
104 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
105 // under unit testing.
107 
108 namespace Catalog_Namespace {
109 
110 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
112  1073741824; // 2^30, give room for over a billion non-temp tables
114  1073741824; // 2^30, give room for over a billion non-temp dictionaries
115 
116 const std::string Catalog::physicalTableNameTag_("_shard_#");
117 
118 thread_local bool Catalog::thread_holds_read_lock = false;
119 
124 
125 // migration will be done as two step process this release
126 // will create and use new table
127 // next release will remove old table, doing this to have fall back path
128 // incase of migration failure
131  sqliteConnector_.query("BEGIN TRANSACTION");
132  try {
134  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
135  if (sqliteConnector_.getNumRows() != 0) {
136  // already done
137  sqliteConnector_.query("END TRANSACTION");
138  return;
139  }
141  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
142  "userid integer references mapd_users, state text, image_hash text, update_time "
143  "timestamp, "
144  "metadata text, UNIQUE(userid, name) )");
145  // now copy content from old table to new table
147  "insert into mapd_dashboards (id, name , "
148  "userid, state, image_hash, update_time , "
149  "metadata) "
150  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
151  "view_metadata "
152  "from mapd_frontend_views");
153  } catch (const std::exception& e) {
154  sqliteConnector_.query("ROLLBACK TRANSACTION");
155  throw;
156  }
157  sqliteConnector_.query("END TRANSACTION");
158 }
159 
160 namespace {
161 
162 inline auto table_json_filepath(const std::string& base_path,
163  const std::string& db_name) {
164  return boost::filesystem::path(base_path + "/" + shared::kCatalogDirectoryName + "/" +
165  db_name + "_temp_tables.json");
166 }
167 
168 } // namespace
169 
170 Catalog::Catalog(const string& basePath,
171  const DBMetadata& curDB,
172  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
173  const std::vector<LeafHostInfo>& string_dict_hosts,
174  std::shared_ptr<Calcite> calcite,
175  bool is_new_db)
176  : basePath_(basePath)
177  , sqliteConnector_(curDB.dbName, basePath + "/" + shared::kCatalogDirectoryName + "/")
178  , currentDB_(curDB)
179  , dataMgr_(dataMgr)
180  , string_dict_hosts_(string_dict_hosts)
181  , calciteMgr_(calcite)
182  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
183  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
184  , sqliteMutex_()
185  , sharedMutex_()
186  , thread_holding_sqlite_lock()
187  , thread_holding_write_lock() {
188  if (!g_enable_fsi) {
189  CHECK(!g_enable_system_tables) << "System tables require FSI to be enabled";
190  CHECK(!g_enable_s3_fsi) << "S3 FSI requires FSI to be enabled";
191  }
192 
193  if (!is_new_db) {
194  CheckAndExecuteMigrations();
195  }
196 
197  buildMaps();
198 
199  if (g_enable_fsi) {
200  createDefaultServersIfNotExists();
201  }
202  if (!is_new_db) {
203  CheckAndExecuteMigrationsPostBuildMaps();
204  }
206  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
207  }
208  conditionallyInitializeSystemObjects();
209  // once all initialized use real object
210  initialized_ = true;
211 }
212 
214 
217  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
218  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
219  tableDescIt != tableDescriptorMap_.end();
220  ++tableDescIt) {
221  tableDescIt->second->fragmenter = nullptr;
222  delete tableDescIt->second;
223  }
224 
225  // TableDescriptorMapById points to the same descriptors. No need to delete
226 
227  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
228  columnDescIt != columnDescriptorMap_.end();
229  ++columnDescIt) {
230  delete columnDescIt->second;
231  }
232 
233  // ColumnDescriptorMapById points to the same descriptors. No need to delete
234 
236  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
237  }
238 }
239 
241  if (initialized_) {
242  return this;
243  } else {
244  return SysCatalog::instance().getDummyCatalog().get();
245  }
246 }
247 
250  sqliteConnector_.query("BEGIN TRANSACTION");
251  try {
252  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
253  std::vector<std::string> cols;
254  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
255  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
256  }
257  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
258  cols.end()) {
259  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
261  sqliteConnector_.query(queryString);
262  }
263  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
264  cols.end()) {
265  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
266  std::to_string(0));
267  sqliteConnector_.query(queryString);
268  }
269  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
270  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
271  std::to_string(-1));
272  sqliteConnector_.query(queryString);
273  }
274  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
275  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
276  std::to_string(0));
277  sqliteConnector_.query(queryString);
278  }
279  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
280  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
281  sqliteConnector_.query(queryString);
282  }
283  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
284  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
286  sqliteConnector_.query(queryString);
287  }
288  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
289  cols.end()) {
291  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
292  }
293  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
294  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
295  sqliteConnector_.query(queryString);
296  }
297  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
298  cols.end()) {
299  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
300  std::to_string(-1));
301  sqliteConnector_.query(queryString);
302  }
303  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
304  cols.end()) {
305  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
306  sqliteConnector_.query(queryString);
307  }
308  } catch (std::exception& e) {
309  sqliteConnector_.query("ROLLBACK TRANSACTION");
310  throw;
311  }
312  sqliteConnector_.query("END TRANSACTION");
313 }
314 
317  sqliteConnector_.query("BEGIN TRANSACTION");
318  try {
320  "select name from sqlite_master WHERE type='table' AND "
321  "name='mapd_version_history'");
322  if (sqliteConnector_.getNumRows() == 0) {
324  "CREATE TABLE mapd_version_history(version integer, migration_history text "
325  "unique)");
326  } else {
328  "select * from mapd_version_history where migration_history = "
329  "'notnull_fixlen_arrays'");
330  if (sqliteConnector_.getNumRows() != 0) {
331  // legacy fixlen arrays had migrated
332  // no need for further execution
333  sqliteConnector_.query("END TRANSACTION");
334  return;
335  }
336  }
337  // Insert check for migration
339  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
340  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
341  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
342  // Upating all fixlen array columns
343  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
344  std::to_string(kARRAY) + " AND size>0;");
345  sqliteConnector_.query(queryString);
346  } catch (std::exception& e) {
347  sqliteConnector_.query("ROLLBACK TRANSACTION");
348  throw;
349  }
350  sqliteConnector_.query("END TRANSACTION");
351 }
352 
355  sqliteConnector_.query("BEGIN TRANSACTION");
356  try {
358  "select name from sqlite_master WHERE type='table' AND "
359  "name='mapd_version_history'");
360  if (sqliteConnector_.getNumRows() == 0) {
362  "CREATE TABLE mapd_version_history(version integer, migration_history text "
363  "unique)");
364  } else {
366  "select * from mapd_version_history where migration_history = "
367  "'notnull_geo_columns'");
368  if (sqliteConnector_.getNumRows() != 0) {
369  // legacy geo columns had migrated
370  // no need for further execution
371  sqliteConnector_.query("END TRANSACTION");
372  return;
373  }
374  }
375  // Insert check for migration
377  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
378  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
379  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
380  // Upating all geo columns
381  string queryString(
382  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
383  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
384  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
385  sqliteConnector_.query(queryString);
386  } catch (std::exception& e) {
387  sqliteConnector_.query("ROLLBACK TRANSACTION");
388  throw;
389  }
390  sqliteConnector_.query("END TRANSACTION");
391 }
392 
395  sqliteConnector_.query("BEGIN TRANSACTION");
396  try {
397  // check table still exists
399  "SELECT name FROM sqlite_master WHERE type='table' AND "
400  "name='mapd_frontend_views'");
401  if (sqliteConnector_.getNumRows() == 0) {
402  // table does not exists
403  // no need to migrate
404  sqliteConnector_.query("END TRANSACTION");
405  return;
406  }
407  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
408  std::vector<std::string> cols;
409  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
410  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
411  }
412  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
413  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
414  }
415  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
416  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
417  }
418  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
419  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
420  }
421  } catch (std::exception& e) {
422  sqliteConnector_.query("ROLLBACK TRANSACTION");
423  throw;
424  }
425  sqliteConnector_.query("END TRANSACTION");
426 }
427 
430  sqliteConnector_.query("BEGIN TRANSACTION");
431  try {
433  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
434  "integer references mapd_users, "
435  "link text unique, view_state text, update_time timestamp, view_metadata text)");
436  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
437  std::vector<std::string> cols;
438  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
439  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
440  }
441  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
442  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
443  }
444  } catch (const std::exception& e) {
445  sqliteConnector_.query("ROLLBACK TRANSACTION");
446  throw;
447  }
448  sqliteConnector_.query("END TRANSACTION");
449 }
450 
453  sqliteConnector_.query("BEGIN TRANSACTION");
454  try {
455  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
456  // check table still exists
458  "SELECT name FROM sqlite_master WHERE type='table' AND "
459  "name='mapd_frontend_views'");
460  if (sqliteConnector_.getNumRows() == 0) {
461  // table does not exists
462  // no need to migrate
463  sqliteConnector_.query("END TRANSACTION");
464  return;
465  }
467  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
468  } catch (const std::exception& e) {
469  sqliteConnector_.query("ROLLBACK TRANSACTION");
470  throw;
471  }
472  sqliteConnector_.query("END TRANSACTION");
473 }
474 
475 // introduce DB version into the tables table
476 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
477 // old value
478 
481  if (currentDB_.dbName.length() == 0) {
482  // updateDictionaryNames dbName length is zero nothing to do here
483  return;
484  }
485  sqliteConnector_.query("BEGIN TRANSACTION");
486  try {
487  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
488  std::vector<std::string> cols;
489  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
490  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
491  }
492  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
493  LOG(INFO) << "Updating mapd_tables updatePageSize";
494  // No version number
495  // need to update the defaul tpagesize to old correct value
496  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
497  // need to add new version info
498  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
500  sqliteConnector_.query(queryString);
501  }
502  } catch (std::exception& e) {
503  sqliteConnector_.query("ROLLBACK TRANSACTION");
504  throw;
505  }
506  sqliteConnector_.query("END TRANSACTION");
507 }
508 
511  sqliteConnector_.query("BEGIN TRANSACTION");
512  try {
513  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
514  std::vector<std::string> cols;
515  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
516  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
517  }
518  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
519  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
520  // need to add new version info
521  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
523  sqliteConnector_.query(queryString);
524  // need to add new column to table defintion to indicate deleted column, column used
525  // as bitmap for deleted rows.
527  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
528  }
529  } catch (std::exception& e) {
530  sqliteConnector_.query("ROLLBACK TRANSACTION");
531  throw;
532  }
533  sqliteConnector_.query("END TRANSACTION");
534 }
535 
538  sqliteConnector_.query("BEGIN TRANSACTION");
539  try {
540  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
541  std::vector<std::string> cols;
542  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
543  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
544  }
545  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
546  LOG(INFO) << "Adding support for default values to mapd_columns";
547  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
548  }
549  } catch (std::exception& e) {
550  LOG(ERROR) << "Failed to make metadata update for default values` support";
551  sqliteConnector_.query("ROLLBACK TRANSACTION");
552  throw;
553  }
554  sqliteConnector_.query("END TRANSACTION");
555 }
556 
557 // introduce DB version into the dictionary tables
558 // if the DB does not have a version rename all dictionary tables
559 
562  if (currentDB_.dbName.length() == 0) {
563  // updateDictionaryNames dbName length is zero nothing to do here
564  return;
565  }
566  sqliteConnector_.query("BEGIN TRANSACTION");
567  try {
568  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
569  std::vector<std::string> cols;
570  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
571  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
572  }
573  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
574  // No version number
575  // need to rename dictionaries
576  string dictQuery("SELECT dictid, name from mapd_dictionaries");
577  sqliteConnector_.query(dictQuery);
578  size_t numRows = sqliteConnector_.getNumRows();
579  for (size_t r = 0; r < numRows; ++r) {
580  int dictId = sqliteConnector_.getData<int>(r, 0);
581  std::string dictName = sqliteConnector_.getData<string>(r, 1);
582 
583  std::string oldName = g_base_path + "/" + shared::kDataDirectoryName + "/" +
584  currentDB_.dbName + "_" + dictName;
585  std::string newName = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
586  std::to_string(currentDB_.dbId) + "_DICT_" +
587  std::to_string(dictId);
588 
589  int result = rename(oldName.c_str(), newName.c_str());
590 
591  if (result == 0) {
592  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
593  << newName;
594  } else {
595  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
596  << newName + " dbname '" << currentDB_.dbName << "' error code "
597  << std::to_string(result);
598  }
599  }
600  // need to add new version info
601  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
603  sqliteConnector_.query(queryString);
604  }
605  } catch (std::exception& e) {
606  sqliteConnector_.query("ROLLBACK TRANSACTION");
607  throw;
608  }
609  sqliteConnector_.query("END TRANSACTION");
610 }
611 
614  sqliteConnector_.query("BEGIN TRANSACTION");
615  try {
617  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
618  "logical_table_id integer, physical_table_id integer)");
619  } catch (const std::exception& e) {
620  sqliteConnector_.query("ROLLBACK TRANSACTION");
621  throw;
622  }
623  sqliteConnector_.query("END TRANSACTION");
624 }
625 
626 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
627  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
628  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
629  */
630 
632  sqliteConnector_.query("BEGIN TRANSACTION");
633  try {
634  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
635  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
636  const auto physicalTables = physicalTableIt->second;
637  CHECK(!physicalTables.empty());
638  for (size_t i = 0; i < physicalTables.size(); i++) {
639  int32_t physical_tb_id = physicalTables[i];
641  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
642  "physical_table_id) VALUES (?1, ?2)",
643  std::vector<std::string>{std::to_string(logical_tb_id),
644  std::to_string(physical_tb_id)});
645  }
646  }
647  } catch (std::exception& e) {
648  sqliteConnector_.query("ROLLBACK TRANSACTION");
649  throw;
650  }
651  sqliteConnector_.query("END TRANSACTION");
652 }
653 
656  sqliteConnector_.query("BEGIN TRANSACTION");
657  try {
658  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
659  std::vector<std::string> cols;
660  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
661  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
662  }
663  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
664  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
665  }
666  } catch (std::exception& e) {
667  sqliteConnector_.query("ROLLBACK TRANSACTION");
668  throw;
669  }
670  sqliteConnector_.query("END TRANSACTION");
671 }
672 
675  sqliteConnector_.query("BEGIN TRANSACTION");
676  try {
679  } catch (std::exception& e) {
680  sqliteConnector_.query("ROLLBACK TRANSACTION");
681  throw;
682  }
683  sqliteConnector_.query("END TRANSACTION");
684 }
685 
687  // TODO: Move common migration logic to a shared function.
689  sqliteConnector_.query("BEGIN TRANSACTION");
690  try {
692  "select name from sqlite_master WHERE type='table' AND "
693  "name='mapd_version_history'");
694  static const std::string migration_name{"rename_legacy_data_wrappers"};
695  if (sqliteConnector_.getNumRows() == 0) {
697  "CREATE TABLE mapd_version_history(version integer, migration_history text "
698  "unique)");
699  } else {
701  "select * from mapd_version_history where migration_history = "
702  "'" +
703  migration_name + "'");
704  if (sqliteConnector_.getNumRows() != 0) {
705  // Migration already done.
706  sqliteConnector_.query("END TRANSACTION");
707  return;
708  }
709  }
710  LOG(INFO) << "Executing " << migration_name << " migration.";
711 
712  // Update legacy data wrapper names
714  // clang-format off
715  std::map<std::string, std::string> old_to_new_wrapper_names{
716  {"OMNISCI_CSV", DataWrapperType::CSV},
717  {"OMNISCI_PARQUET", DataWrapperType::PARQUET},
718  {"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
719  {"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
720  {"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
721  {"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
722  };
723  // clang-format on
724 
725  for (const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
727  "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
728  "data_wrapper_type = ?",
729  std::vector<std::string>{new_wrapper_name, old_wrapper_name});
730  }
731 
732  // Record migration.
734  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
735  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
736  LOG(INFO) << migration_name << " migration completed.";
737  } catch (std::exception& e) {
738  sqliteConnector_.query("ROLLBACK TRANSACTION");
739  throw;
740  }
741  sqliteConnector_.query("END TRANSACTION");
742 }
743 
746  sqliteConnector_.query("BEGIN TRANSACTION");
747  try {
749  } catch (const std::exception& e) {
750  sqliteConnector_.query("ROLLBACK TRANSACTION");
751  throw;
752  }
753  sqliteConnector_.query("END TRANSACTION");
754 }
755 
756 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
757  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
758  "omnisci_foreign_servers(id integer primary key, name text unique, " +
759  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
760  "options text)";
761 }
762 
763 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
764  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
765  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
766  "options text, last_refresh_time integer, next_refresh_time integer, " +
767  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
768  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
769 }
770 
771 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
772  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
773  "omnisci_custom_expressions(id integer primary key, name text, " +
774  "expression_json text, data_source_type text, " +
775  "data_source_id integer, is_deleted boolean)";
776 }
777 
780  sqliteConnector_.query("BEGIN TRANSACTION");
781  std::vector<DBObject> objects;
782  try {
784  "SELECT name FROM sqlite_master WHERE type='table' AND "
785  "name='mapd_record_ownership_marker'");
786  // check if mapd catalog - marker exists
787  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
788  // already done
789  sqliteConnector_.query("END TRANSACTION");
790  return;
791  }
792  // check if different catalog - marker exists
793  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
794  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
795  // Check if migration is being performed on existing non mapd catalogs
796  // Older non mapd dbs will have table but no record in them
797  if (sqliteConnector_.getNumRows() != 0) {
798  // already done
799  sqliteConnector_.query("END TRANSACTION");
800  return;
801  }
802  }
803  // marker not exists - create one
804  else {
805  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
806  }
807 
808  DBMetadata db;
809  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
810  // place dbId as a refernce for migration being performed
812  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
813  std::vector<std::string>{std::to_string(db.dbOwner)});
814 
815  static const std::map<const DBObjectType, const AccessPrivileges>
816  object_level_all_privs_lookup{
822 
823  // grant owner all permissions on DB
824  DBObjectKey key;
825  key.dbId = currentDB_.dbId;
826  auto _key_place = [&key](auto type) {
827  key.permissionType = type;
828  return key;
829  };
830  for (auto& it : object_level_all_privs_lookup) {
831  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
832  objects.back().setName(currentDB_.dbName);
833  }
834 
835  {
836  // other users tables and views
837  string tableQuery(
838  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
839  sqliteConnector_.query(tableQuery);
840  size_t numRows = sqliteConnector_.getNumRows();
841  for (size_t r = 0; r < numRows; ++r) {
842  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
843  std::string tableName = sqliteConnector_.getData<string>(r, 1);
844  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
845  bool isview = sqliteConnector_.getData<bool>(r, 3);
846 
849  DBObjectKey key;
850  key.dbId = currentDB_.dbId;
851  key.objectId = tableid;
852  key.permissionType = type;
853 
854  DBObject obj(tableName, type);
855  obj.setObjectKey(key);
856  obj.setOwner(ownerid);
859 
860  objects.push_back(obj);
861  }
862  }
863 
864  {
865  // other users dashboards
866  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
867  sqliteConnector_.query(tableQuery);
868  size_t numRows = sqliteConnector_.getNumRows();
869  for (size_t r = 0; r < numRows; ++r) {
870  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
871  std::string dashName = sqliteConnector_.getData<string>(r, 1);
872  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
873 
875  DBObjectKey key;
876  key.dbId = currentDB_.dbId;
877  key.objectId = dashId;
878  key.permissionType = type;
879 
880  DBObject obj(dashName, type);
881  obj.setObjectKey(key);
882  obj.setOwner(ownerid);
884 
885  objects.push_back(obj);
886  }
887  }
888  } catch (const std::exception& e) {
889  sqliteConnector_.query("ROLLBACK TRANSACTION");
890  throw;
891  }
892  sqliteConnector_.query("END TRANSACTION");
893 
894  // now apply the objects to the syscat to track the permisisons
895  // moved outside transaction to avoid lock in sqlite
896  try {
898  } catch (const std::exception& e) {
899  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
900  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
901  e.what());
902  // will need to remove the mapd_record_ownership_marker table and retry
903  }
904 }
905 
910 }
911 
913  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
914  std::vector<std::string> dashboard_ids;
915  static const std::string migration_name{"dashboard_roles_migration"};
916  {
918  sqliteConnector_.query("BEGIN TRANSACTION");
919  try {
920  // migration_history should be present in all catalogs by now
921  // if not then would be created before this migration
923  "select * from mapd_version_history where migration_history = '" +
924  migration_name + "'");
925  if (sqliteConnector_.getNumRows() != 0) {
926  // no need for further execution
927  sqliteConnector_.query("END TRANSACTION");
928  return;
929  }
930  LOG(INFO) << "Performing dashboard internal roles Migration.";
931  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
932  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
935  sqliteConnector_.getData<string>(i, 0)))) {
936  // Successfully created roles during previous migration/crash
937  // No need to include them
938  continue;
939  }
940  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
941  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
942  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
943  }
944  } catch (const std::exception& e) {
945  sqliteConnector_.query("ROLLBACK TRANSACTION");
946  throw;
947  }
948  sqliteConnector_.query("END TRANSACTION");
949  }
950  // All current grantees with shared dashboards.
951  const auto active_grantees =
953 
954  try {
955  // NOTE(wamsi): Transactionally unsafe
956  for (auto dash : dashboards) {
957  createOrUpdateDashboardSystemRole(dash.second.second,
958  dash.second.first,
960  std::to_string(currentDB_.dbId), dash.first));
961  auto result = active_grantees.find(dash.first);
962  if (result != active_grantees.end()) {
965  dash.first)},
966  result->second);
967  }
968  }
970  // check if this has already been completed
972  "select * from mapd_version_history where migration_history = '" +
973  migration_name + "'");
974  if (sqliteConnector_.getNumRows() != 0) {
975  return;
976  }
978  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
979  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
980  } catch (const std::exception& e) {
981  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
982  << e.what();
983  throw;
984  }
985  LOG(INFO) << "Successfully created dashboard system roles during migration.";
986 }
987 
998  updatePageSize();
1002  if (g_enable_fsi) {
1003  updateFsiSchemas();
1005  }
1008 }
1009 
1013 }
1014 
1015 namespace {
1016 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
1017  auto users = SysCatalog::instance().getAllUserMetadata();
1018  std::map<int32_t, std::string> user_name_by_user_id;
1019  for (const auto& user : users) {
1020  user_name_by_user_id[user.userId] = user.userName;
1021  }
1022  return user_name_by_user_id;
1023 }
1024 
1026  int32_t id,
1027  const std::map<int32_t, std::string>& user_name_by_user_id) {
1028  auto entry = user_name_by_user_id.find(id);
1029  if (entry != user_name_by_user_id.end()) {
1030  return entry->second;
1031  }
1032  // a user could be deleted and a dashboard still exist?
1033  return "Unknown";
1034 }
1035 } // namespace
1036 
1038  std::string dictQuery(
1039  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1040  sqliteConnector_.query(dictQuery);
1041  auto numRows = sqliteConnector_.getNumRows();
1042  for (size_t r = 0; r < numRows; ++r) {
1043  auto dictId = sqliteConnector_.getData<int>(r, 0);
1044  auto dictName = sqliteConnector_.getData<string>(r, 1);
1045  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1046  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1047  auto refcount = sqliteConnector_.getData<int>(r, 4);
1048  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1049  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1050  DictRef dict_ref(currentDB_.dbId, dictId);
1051  auto dd = new DictDescriptor(
1052  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1053  dictDescriptorMapByRef_[dict_ref].reset(dd);
1054  }
1055 }
1056 
1058  std::string tableQuery(
1059  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1060  "max_chunk_size, frag_page_size, "
1061  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1062  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1063  "from mapd_tables");
1064  sqliteConnector_.query(tableQuery);
1065  auto numRows = sqliteConnector_.getNumRows();
1066  for (size_t r = 0; r < numRows; ++r) {
1067  TableDescriptor* td;
1068  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1069  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1070  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1071  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1072  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1073  "supported table option (table "
1074  << table_name << " [" << table_id << "] in database "
1075  << currentDB_.dbName << ").";
1076  }
1077 
1078  if (storage_type == StorageType::FOREIGN_TABLE) {
1079  td = new foreign_storage::ForeignTable();
1080  } else {
1081  td = new TableDescriptor();
1082  }
1083 
1084  td->storageType = storage_type;
1085  td->tableId = sqliteConnector_.getData<int>(r, 0);
1086  td->tableName = sqliteConnector_.getData<string>(r, 1);
1087  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1088  td->isView = sqliteConnector_.getData<bool>(r, 3);
1089  td->fragments = sqliteConnector_.getData<string>(r, 4);
1090  td->fragType =
1092  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1093  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1094  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1095  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1096  td->partitions = sqliteConnector_.getData<string>(r, 10);
1097  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1098  td->shard = sqliteConnector_.getData<int>(r, 12);
1099  td->nShards = sqliteConnector_.getData<int>(r, 13);
1100  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1101  td->userId = sqliteConnector_.getData<int>(r, 15);
1102  td->sortedColumnId =
1103  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1104  if (!td->isView) {
1105  td->fragmenter = nullptr;
1106  }
1107  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1108  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1109  td->hasDeletedCol = false;
1110 
1112  tableDescriptorMapById_[td->tableId] = td;
1113  }
1114 }
1115 
1117  std::string columnQuery(
1118  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1119  "is_notnull, compression, comp_param, "
1120  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1121  "default_value from "
1122  "mapd_columns ORDER BY tableid, "
1123  "columnid");
1124  sqliteConnector_.query(columnQuery);
1125  auto numRows = sqliteConnector_.getNumRows();
1126  int32_t skip_physical_cols = 0;
1127  for (size_t r = 0; r < numRows; ++r) {
1128  ColumnDescriptor* cd = new ColumnDescriptor();
1129  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1130  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1131  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1135  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1139  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1140  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1141  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1142  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1143  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1144  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1145  if (sqliteConnector_.isNull(r, 16)) {
1146  cd->default_value = std::nullopt;
1147  } else {
1148  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1149  }
1150  cd->isGeoPhyCol = skip_physical_cols > 0;
1151  addToColumnMap(cd);
1152 
1153  if (skip_physical_cols <= 0) {
1154  skip_physical_cols = cd->columnType.get_physical_cols();
1155  }
1156 
1157  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1158  CHECK(td_itr != tableDescriptorMapById_.end());
1159 
1160  if (cd->isDeletedCol) {
1161  td_itr->second->hasDeletedCol = true;
1162  setDeletedColumnUnlocked(td_itr->second, cd);
1163  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1164  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1165  }
1166  }
1167 
1168  // sort columnIdBySpi_ based on columnId
1169  for (auto& tit : tableDescriptorMapById_) {
1170  std::sort(tit.second->columnIdBySpi_.begin(),
1171  tit.second->columnIdBySpi_.end(),
1172  [](const size_t a, const size_t b) -> bool { return a < b; });
1173  }
1174 } // namespace Catalog_Namespace
1175 
1177  std::string viewQuery("SELECT tableid, sql FROM mapd_views");
1178  sqliteConnector_.query(viewQuery);
1179  auto numRows = sqliteConnector_.getNumRows();
1180  for (size_t r = 0; r < numRows; ++r) {
1181  auto tableId = sqliteConnector_.getData<int>(r, 0);
1182  auto td = tableDescriptorMapById_[tableId];
1183  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1184  td->fragmenter = nullptr;
1185  }
1186 }
1187 
1189  const std::map<int32_t, std::string>& user_name_by_user_id) {
1190  std::string frontendViewQuery(
1191  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1192  "userid, "
1193  "metadata "
1194  "FROM mapd_dashboards");
1195  sqliteConnector_.query(frontendViewQuery);
1196  auto numRows = sqliteConnector_.getNumRows();
1197  for (size_t r = 0; r < numRows; ++r) {
1198  auto vd = std::make_shared<DashboardDescriptor>();
1199  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1200  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1201  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1202  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1203  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1204  vd->userId = sqliteConnector_.getData<int>(r, 5);
1205  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1206  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1207  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1209  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1210  }
1211 }
1212 
1214  std::string linkQuery(
1215  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1216  "update_time), view_metadata "
1217  "FROM mapd_links");
1218  sqliteConnector_.query(linkQuery);
1219  auto numRows = sqliteConnector_.getNumRows();
1220  for (size_t r = 0; r < numRows; ++r) {
1221  auto ld = new LinkDescriptor();
1222  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1223  ld->userId = sqliteConnector_.getData<int>(r, 1);
1224  ld->link = sqliteConnector_.getData<string>(r, 2);
1225  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1226  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1227  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1229  linkDescriptorMapById_[ld->linkId] = ld;
1230  }
1231 }
1232 
1234  /* rebuild map linking logical tables to corresponding physical ones */
1235  std::string logicalToPhysicalTableMapQuery(
1236  "SELECT logical_table_id, physical_table_id "
1237  "FROM mapd_logical_to_physical");
1238  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1239  auto numRows = sqliteConnector_.getNumRows();
1240  for (size_t r = 0; r < numRows; ++r) {
1241  auto logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1242  auto physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1243  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1244  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1245  /* add new entity to the map logicalToPhysicalTableMapById_ */
1246  std::vector<int32_t> physicalTables{physical_tb_id};
1247  const auto it_ok =
1248  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1249  CHECK(it_ok.second);
1250  } else {
1251  /* update map logicalToPhysicalTableMapById_ */
1252  physicalTableIt->second.push_back(physical_tb_id);
1253  }
1254  }
1255 }
1256 
1257 // The catalog uses a series of maps to cache data that have been read from the sqlite
1258 // tables. Usually we update these maps whenever we write using sqlite, so this function
1259 // is responsible for initializing all of them based on the sqlite db state.
1261  // Get all user id to username mapping here in order to avoid making a call to
1262  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
1263  // catalog.
1264  const auto user_name_by_user_id = get_user_id_to_user_name_map();
1265 
1266  cat_write_lock write_lock(this);
1268 
1271 
1272  if (g_enable_fsi) {
1275  }
1276 
1279  buildDashboardsMapUnlocked(user_name_by_user_id);
1283 }
1284 
1287  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1288  "is_deleted "
1289  "FROM omnisci_custom_expressions");
1290  auto num_rows = sqliteConnector_.getNumRows();
1291  for (size_t row = 0; row < num_rows; row++) {
1292  auto custom_expr = getCustomExpressionFromConnector(row);
1293  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1294  }
1295 }
1296 
1297 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1298  auto id = sqliteConnector_.getData<int>(row, 0);
1299  auto name = sqliteConnector_.getData<string>(row, 1);
1300  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1301  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1302  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1303  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1304  return std::make_unique<CustomExpression>(
1305  id,
1306  name,
1307  expression_json,
1308  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1309  data_source_id,
1310  is_deleted);
1311 }
1312 
1314  const list<ColumnDescriptor>& columns,
1315  const list<DictDescriptor>& dicts) {
1316  cat_write_lock write_lock(this);
1317  TableDescriptor* new_td;
1318 
1319  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1320  if (foreign_table) {
1321  auto new_foreign_table = new foreign_storage::ForeignTable();
1322  *new_foreign_table = *foreign_table;
1323  new_td = new_foreign_table;
1324  } else {
1325  new_td = new TableDescriptor();
1326  *new_td = *td;
1327  }
1328 
1329  new_td->mutex_ = std::make_shared<std::mutex>();
1330  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1331  tableDescriptorMapById_[td->tableId] = new_td;
1332  for (auto cd : columns) {
1333  ColumnDescriptor* new_cd = new ColumnDescriptor();
1334  *new_cd = cd;
1335  addToColumnMap(new_cd);
1336 
1337  // Add deleted column to the map
1338  if (cd.isDeletedCol) {
1339  CHECK(new_td->hasDeletedCol);
1340  setDeletedColumnUnlocked(new_td, new_cd);
1341  }
1342  }
1343 
1344  std::sort(new_td->columnIdBySpi_.begin(),
1345  new_td->columnIdBySpi_.end(),
1346  [](const size_t a, const size_t b) -> bool { return a < b; });
1347 
1348  std::unique_ptr<StringDictionaryClient> client;
1349  DictRef dict_ref(currentDB_.dbId, -1);
1350  if (!string_dict_hosts_.empty()) {
1351  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1352  }
1353  for (auto dd : dicts) {
1354  if (!dd.dictRef.dictId) {
1355  // Dummy entry created for a shard of a logical table, nothing to do.
1356  continue;
1357  }
1358  dict_ref.dictId = dd.dictRef.dictId;
1359  if (client) {
1360  client->create(dict_ref, dd.dictIsTemp);
1361  }
1362  DictDescriptor* new_dd = new DictDescriptor(dd);
1363  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1364  if (!dd.dictIsTemp) {
1365  boost::filesystem::create_directory(new_dd->dictFolderPath);
1366  }
1367  }
1368 }
1369 
1370 void Catalog::removeTableFromMap(const string& tableName,
1371  const int tableId,
1372  const bool is_on_error) {
1373  cat_write_lock write_lock(this);
1374  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1375  if (tableDescIt == tableDescriptorMapById_.end()) {
1376  throw runtime_error("Table " + tableName + " does not exist.");
1377  }
1378 
1379  TableDescriptor* td = tableDescIt->second;
1380 
1381  if (td->hasDeletedCol) {
1382  const auto ret = deletedColumnPerTable_.erase(td);
1383  CHECK_EQ(ret, size_t(1));
1384  }
1385 
1386  tableDescriptorMapById_.erase(tableDescIt);
1387  tableDescriptorMap_.erase(to_upper(tableName));
1388  td->fragmenter = nullptr;
1389  dict_columns_by_table_id_.erase(tableId);
1390 
1392  delete td;
1393 
1394  std::unique_ptr<StringDictionaryClient> client;
1395  if (SysCatalog::instance().isAggregator()) {
1396  CHECK(!string_dict_hosts_.empty());
1397  DictRef dict_ref(currentDB_.dbId, -1);
1398  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1399  }
1400 
1401  // delete all column descriptors for the table
1402  // no more link columnIds to sequential indexes!
1403  for (auto cit = columnDescriptorMapById_.begin();
1404  cit != columnDescriptorMapById_.end();) {
1405  if (tableId != std::get<0>(cit->first)) {
1406  ++cit;
1407  } else {
1408  int i = std::get<1>(cit++->first);
1409  ColumnIdKey cidKey(tableId, i);
1410  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1411  ColumnDescriptor* cd = colDescIt->second;
1412  columnDescriptorMapById_.erase(colDescIt);
1413  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1414  columnDescriptorMap_.erase(cnameKey);
1415  const int dictId = cd->columnType.get_comp_param();
1416  // Dummy dictionaries created for a shard of a logical table have the id set to
1417  // zero.
1418  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1419  INJECT_TIMER(removingDicts);
1420  DictRef dict_ref(currentDB_.dbId, dictId);
1421  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1422  // If we're removing this table due to an error, it is possible that the string
1423  // dictionary reference was never populated. Don't crash, just continue cleaning
1424  // up the TableDescriptor and ColumnDescriptors
1425  if (!is_on_error) {
1426  CHECK(dictIt != dictDescriptorMapByRef_.end());
1427  } else {
1428  if (dictIt == dictDescriptorMapByRef_.end()) {
1429  continue;
1430  }
1431  }
1432  const auto& dd = dictIt->second;
1433  CHECK_GE(dd->refcount, 1);
1434  --dd->refcount;
1435  if (!dd->refcount) {
1436  dd->stringDict.reset();
1437  if (!isTemp) {
1438  File_Namespace::renameForDelete(dd->dictFolderPath);
1439  }
1440  if (client) {
1441  client->drop(dict_ref);
1442  }
1443  dictDescriptorMapByRef_.erase(dictIt);
1444  }
1445  }
1446 
1447  delete cd;
1448  }
1449  }
1450 }
1451 
1453  cat_write_lock write_lock(this);
1455 }
1456 
1458  cat_write_lock write_lock(this);
1460  std::make_shared<DashboardDescriptor>(vd);
1461 }
1462 
1463 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1464  const int& user_id) {
1465  std::vector<DBObject> objects;
1466  DBObjectKey key;
1467  key.dbId = currentDB_.dbId;
1468  auto _key_place = [&key](auto type, auto id) {
1469  key.permissionType = type;
1470  key.objectId = id;
1471  return key;
1472  };
1473  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1474  auto td = getMetadataForTable(object_name, false);
1475  if (!td) {
1476  // Parsed object source is not present in current database
1477  // LOG the info and ignore
1478  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1479  << " no longer exists in current DB.";
1480  continue;
1481  }
1482  // Dashboard source can be Table or View
1483  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1484  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1486  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1487  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1488  objects.back().setName(td->tableName);
1489  }
1490  return objects;
1491 }
1492 
1493 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1494  const int32_t& user_id,
1495  const std::string& dash_role_name) {
1496  auto objects = parseDashboardObjects(view_meta, user_id);
1497  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1498  if (!rl) {
1499  // Dashboard role does not exist
1500  // create role and grant privileges
1501  // NOTE(wamsi): Transactionally unsafe
1503  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1504  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1505  } else {
1506  // Dashboard system role already exists
1507  // Add/remove privileges on objects
1508  std::set<DBObjectKey> revoke_keys;
1509  auto ex_objects = rl->getDbObjects(true);
1510  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1511  if (key.permissionType != TableDBObjectType &&
1512  key.permissionType != ViewDBObjectType) {
1513  continue;
1514  }
1515  bool found = false;
1516  for (auto obj : objects) {
1517  found = key == obj.getObjectKey() ? true : false;
1518  if (found) {
1519  break;
1520  }
1521  }
1522  if (!found) {
1523  revoke_keys.insert(key);
1524  }
1525  }
1526  for (auto& key : revoke_keys) {
1527  // revoke privs on object since the object is no
1528  // longer used by the dashboard as source
1529  // NOTE(wamsi): Transactionally unsafe
1531  dash_role_name, *rl->findDbObject(key, true), *this);
1532  }
1533  // Update privileges on remaining objects
1534  // NOTE(wamsi): Transactionally unsafe
1535  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1536  }
1537 }
1538 
1540  cat_write_lock write_lock(this);
1541  LinkDescriptor* new_ld = new LinkDescriptor();
1542  *new_ld = ld;
1544  linkDescriptorMapById_[ld.linkId] = new_ld;
1545 }
1546 
1548  auto time_ms = measure<>::execution([&]() {
1549  // instanciate table fragmenter upon first use
1550  // assume only insert order fragmenter is supported
1552  vector<Chunk> chunkVec;
1553  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1554  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1555  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1556  if (td->sortedColumnId > 0) {
1557  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1558  chunkVec,
1559  dataMgr_.get(),
1560  const_cast<Catalog*>(this),
1561  td->tableId,
1562  td->shard,
1563  td->maxFragRows,
1564  td->maxChunkSize,
1565  td->fragPageSize,
1566  td->maxRows,
1567  td->persistenceLevel);
1568  } else {
1569  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1570  chunkVec,
1571  dataMgr_.get(),
1572  const_cast<Catalog*>(this),
1573  td->tableId,
1574  td->shard,
1575  td->maxFragRows,
1576  td->maxChunkSize,
1577  td->fragPageSize,
1578  td->maxRows,
1579  td->persistenceLevel,
1580  !td->storageType.empty());
1581  }
1582  });
1583  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1584  << time_ms << "ms";
1585 }
1586 
1588  const std::string& tableName) const {
1589  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1590  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1591  return nullptr;
1592  }
1593  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1594 }
1595 
1597  const std::string& tableName) const {
1598  cat_read_lock read_lock(this);
1599  return getForeignTableUnlocked(tableName);
1600 }
1601 
1602 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1603  const bool populateFragmenter) const {
1604  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1605  // pure metadata calls
1606  cat_read_lock read_lock(this);
1607  auto td = getMutableMetadataForTableUnlocked(tableName);
1608  if (!td) {
1609  return nullptr;
1610  }
1611  read_lock.unlock();
1612  if (populateFragmenter) {
1613  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1614  if (td->fragmenter == nullptr && !td->isView) {
1616  }
1617  }
1618  return td; // returns pointer to table descriptor
1619 }
1620 
1622  bool populateFragmenter) const {
1623  cat_read_lock read_lock(this);
1624  auto td = getMutableMetadataForTableUnlocked(table_id);
1625  if (!td) {
1626  return nullptr;
1627  }
1628  read_lock.unlock();
1629  if (populateFragmenter) {
1630  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1631  if (td->fragmenter == nullptr && !td->isView) {
1633  }
1634  }
1635  return td;
1636 }
1637 
1638 std::optional<std::string> Catalog::getTableName(int32_t table_id) const {
1639  cat_read_lock read_lock(this);
1640  auto td = getMutableMetadataForTableUnlocked(table_id);
1641  if (!td) {
1642  return {};
1643  }
1644  return td->tableName;
1645 }
1646 
1647 std::optional<int32_t> Catalog::getTableId(const std::string& table_name) const {
1648  cat_read_lock read_lock(this);
1649  auto td = getMutableMetadataForTableUnlocked(table_name);
1650  if (!td) {
1651  return {};
1652  }
1653  return td->tableId;
1654 }
1655 
1657  const std::string& table_name) const {
1658  auto it = tableDescriptorMap_.find(to_upper(table_name));
1659  if (it == tableDescriptorMap_.end()) {
1660  return nullptr;
1661  }
1662  return it->second;
1663 }
1664 
1666  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1667  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1668  return nullptr;
1669  }
1670  return tableDescIt->second;
1671 }
1672 
1674  const bool load_dict) const {
1675  cat_read_lock read_lock(this);
1676  const DictRef dictRef(currentDB_.dbId, dict_id);
1677  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1678  if (dictDescIt ==
1679  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1680  return nullptr;
1681  }
1682  auto& dd = dictDescIt->second;
1683 
1684  if (load_dict) {
1685  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1686  if (!dd->stringDict) {
1687  auto time_ms = measure<>::execution([&]() {
1688  if (string_dict_hosts_.empty()) {
1689  if (dd->dictIsTemp) {
1690  dd->stringDict = std::make_shared<StringDictionary>(
1691  dd->dictRef, dd->dictFolderPath, true, true, g_cache_string_hash);
1692  } else {
1693  dd->stringDict = std::make_shared<StringDictionary>(
1694  dd->dictRef, dd->dictFolderPath, false, true, g_cache_string_hash);
1695  }
1696  } else {
1697  dd->stringDict =
1698  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1699  }
1700  });
1701  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1702  << dd->dictRef.dictId << " was " << time_ms << "ms";
1703  }
1704  }
1705 
1706  return dd.get();
1707 }
1708 
1709 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1710  return string_dict_hosts_;
1711 }
1712 
1714  const string& columnName) const {
1715  cat_read_lock read_lock(this);
1716 
1717  ColumnKey columnKey(tableId, to_upper(columnName));
1718  auto colDescIt = columnDescriptorMap_.find(columnKey);
1719  if (colDescIt ==
1720  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1721  return nullptr;
1722  }
1723  return colDescIt->second;
1724 }
1725 
1726 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1727  cat_read_lock read_lock(this);
1728  ColumnIdKey columnIdKey(table_id, column_id);
1729  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1730  if (colDescIt == columnDescriptorMapById_
1731  .end()) { // need to check to make sure column exists for table
1732  return nullptr;
1733  }
1734  return colDescIt->second;
1735 }
1736 
1737 const std::optional<std::string> Catalog::getColumnName(int table_id,
1738  int column_id) const {
1739  cat_read_lock read_lock(this);
1740  auto it = columnDescriptorMapById_.find(ColumnIdKey{table_id, column_id});
1741  if (it == columnDescriptorMapById_.end()) {
1742  return {};
1743  }
1744  return it->second->columnName;
1745 }
1746 
1747 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1748  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1749  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1750  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1751 
1752  auto spx = spi;
1753  int phi = 0;
1754  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1755  {
1756  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1757  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1758  }
1759 
1760  CHECK(0 < spx && spx <= columnIdBySpi.size())
1761  << "spx = " << spx << ", size = " << columnIdBySpi.size();
1762  return columnIdBySpi[spx - 1] + phi;
1763 }
1764 
1765 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
1766  cat_read_lock read_lock(this);
1767  return getColumnIdBySpiUnlocked(table_id, spi);
1768 }
1769 
1771  const size_t spi) const {
1772  cat_read_lock read_lock(this);
1773 
1774  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
1775  ColumnIdKey columnIdKey(tableId, columnId);
1776  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1777  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
1778 }
1779 
1780 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
1781  const UserMetadata& user) {
1782  std::stringstream invalid_ids, restricted_ids;
1783 
1784  for (int32_t dashboard_id : dashboard_ids) {
1785  if (!getMetadataForDashboard(dashboard_id)) {
1786  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
1787  continue;
1788  }
1789  DBObject object(dashboard_id, DashboardDBObjectType);
1790  object.loadKey(*this);
1791  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
1792  std::vector<DBObject> privs = {object};
1793  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
1794  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
1795  }
1796  }
1797 
1798  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
1799  std::stringstream error_message;
1800  error_message << "Delete dashboard(s) failed with error(s):";
1801  if (invalid_ids.str().size() > 0) {
1802  error_message << "\nDashboard id: " << invalid_ids.str()
1803  << " - Dashboard id does not exist";
1804  }
1805  if (restricted_ids.str().size() > 0) {
1806  error_message
1807  << "\nDashboard id: " << restricted_ids.str()
1808  << " - User should be either owner of dashboard or super user to delete it";
1809  }
1810  throw std::runtime_error(error_message.str());
1811  }
1812  std::vector<DBObject> dash_objs;
1813 
1814  for (int32_t dashboard_id : dashboard_ids) {
1815  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
1816  }
1817  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
1819  {
1820  cat_write_lock write_lock(this);
1822 
1823  sqliteConnector_.query("BEGIN TRANSACTION");
1824  try {
1825  for (int32_t dashboard_id : dashboard_ids) {
1826  auto dash = getMetadataForDashboard(dashboard_id);
1827  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
1828  // rollback if already deleted
1829  if (!dash) {
1830  throw std::runtime_error(
1831  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
1832  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
1833  }
1834  std::string user_id = std::to_string(dash->userId);
1835  std::string dash_name = dash->dashboardName;
1836  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
1837  dashboardDescriptorMap_.erase(viewDescIt);
1839  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
1840  std::vector<std::string>{dash_name, user_id});
1841  }
1842  } catch (std::exception& e) {
1843  sqliteConnector_.query("ROLLBACK TRANSACTION");
1844  throw;
1845  }
1846  sqliteConnector_.query("END TRANSACTION");
1847  }
1848 }
1849 
1851  const string& userId,
1852  const string& dashName) const {
1853  cat_read_lock read_lock(this);
1854 
1855  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
1856  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
1857  return nullptr;
1858  }
1859  return viewDescIt->second.get(); // returns pointer to view descriptor
1860 }
1861 
1863  cat_read_lock read_lock(this);
1864  std::string userId;
1865  std::string name;
1866  bool found{false};
1867  {
1868  for (auto descp : dashboardDescriptorMap_) {
1869  auto dash = descp.second.get();
1870  if (dash->dashboardId == id) {
1871  userId = std::to_string(dash->userId);
1872  name = dash->dashboardName;
1873  found = true;
1874  break;
1875  }
1876  }
1877  }
1878  if (found) {
1879  return getMetadataForDashboard(userId, name);
1880  }
1881  return nullptr;
1882 }
1883 
1884 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
1885  cat_read_lock read_lock(this);
1886  auto linkDescIt = linkDescriptorMap_.find(link);
1887  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
1888  return nullptr;
1889  }
1890  return linkDescIt->second; // returns pointer to view descriptor
1891 }
1892 
1894  cat_read_lock read_lock(this);
1895  auto linkDescIt = linkDescriptorMapById_.find(linkId);
1896  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
1897  return nullptr;
1898  }
1899  return linkDescIt->second;
1900 }
1901 
1903  cat_read_lock read_lock(this);
1904  const auto table = getMutableMetadataForTableUnlocked(table_id);
1905  CHECK(table);
1906  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
1907  CHECK(foreign_table);
1908  return foreign_table;
1909 }
1910 
1912  const TableDescriptor* td,
1913  list<const ColumnDescriptor*>& columnDescriptors,
1914  const bool fetchSystemColumns,
1915  const bool fetchVirtualColumns,
1916  const bool fetchPhysicalColumns) const {
1917  int32_t skip_physical_cols = 0;
1918  for (const auto& columnDescriptor : columnDescriptorMapById_) {
1919  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
1920  --skip_physical_cols;
1921  continue;
1922  }
1923  auto cd = columnDescriptor.second;
1924  if (cd->tableId != td->tableId) {
1925  continue;
1926  }
1927  if (!fetchSystemColumns && cd->isSystemCol) {
1928  continue;
1929  }
1930  if (!fetchVirtualColumns && cd->isVirtualCol) {
1931  continue;
1932  }
1933  if (!fetchPhysicalColumns) {
1934  const auto& col_ti = cd->columnType;
1935  skip_physical_cols = col_ti.get_physical_cols();
1936  }
1937  columnDescriptors.push_back(cd);
1938  }
1939 }
1940 
1941 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
1942  const int tableId,
1943  const bool fetchSystemColumns,
1944  const bool fetchVirtualColumns,
1945  const bool fetchPhysicalColumns) const {
1946  cat_read_lock read_lock(this);
1947  std::list<const ColumnDescriptor*> columnDescriptors;
1950  columnDescriptors,
1951  fetchSystemColumns,
1952  fetchVirtualColumns,
1953  fetchPhysicalColumns);
1954  return columnDescriptors;
1955 }
1956 
1957 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
1958  cat_read_lock read_lock(this);
1959  list<const TableDescriptor*> table_list;
1960  for (auto p : tableDescriptorMapById_) {
1961  table_list.push_back(p.second);
1962  }
1963  return table_list;
1964 }
1965 
1966 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
1967  cat_read_lock read_lock(this);
1968  std::vector<TableDescriptor> tables;
1969  tables.reserve(tableDescriptorMapById_.size());
1970  for (auto table_entry : tableDescriptorMapById_) {
1971  tables.emplace_back(*table_entry.second);
1972  tables.back().fragmenter = nullptr;
1973  }
1974  return tables;
1975 }
1976 
1977 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
1978  cat_read_lock read_lock(this);
1979  list<const DashboardDescriptor*> dashboards;
1980  for (auto dashboard_entry : dashboardDescriptorMap_) {
1981  dashboards.push_back(dashboard_entry.second.get());
1982  }
1983  return dashboards;
1984 }
1985 
1986 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
1987  cat_read_lock read_lock(this);
1988  std::vector<DashboardDescriptor> dashboards;
1989  dashboards.reserve(dashboardDescriptorMap_.size());
1990  for (auto dashboard_entry : dashboardDescriptorMap_) {
1991  dashboards.emplace_back(*dashboard_entry.second);
1992  }
1993  return dashboards;
1994 }
1995 
1997  cat_write_lock write_lock(this);
1998  const auto& td = *tableDescriptorMapById_[cd.tableId];
1999  list<DictDescriptor> dds;
2000  setColumnDictionary(cd, dds, td, true);
2001  auto& dd = dds.back();
2002  CHECK(dd.dictRef.dictId);
2003 
2004  std::unique_ptr<StringDictionaryClient> client;
2005  if (!string_dict_hosts_.empty()) {
2006  client.reset(new StringDictionaryClient(
2007  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
2008  }
2009  if (client) {
2010  client->create(dd.dictRef, dd.dictIsTemp);
2011  }
2012 
2013  DictDescriptor* new_dd = new DictDescriptor(dd);
2014  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2015  if (!dd.dictIsTemp) {
2016  boost::filesystem::create_directory(new_dd->dictFolderPath);
2017  }
2018  return dd.dictRef;
2019 }
2020 
2022  cat_write_lock write_lock(this);
2024  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
2025  return;
2026  }
2027  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
2028  return;
2029  }
2030  const auto dictId = cd.columnType.get_comp_param();
2031  CHECK_GT(dictId, 0);
2032  // decrement and zero check dict ref count
2033  const auto td = getMetadataForTable(cd.tableId, false);
2034  CHECK(td);
2036  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2037  std::to_string(dictId));
2039  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
2040  const auto refcount = sqliteConnector_.getData<int>(0, 0);
2041  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
2042  << refcount;
2043  if (refcount > 0) {
2044  return;
2045  }
2046  const DictRef dictRef(currentDB_.dbId, dictId);
2047  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
2048  std::to_string(dictId));
2050  "/DB_" + std::to_string(currentDB_.dbId) + "_DICT_" +
2051  std::to_string(dictId));
2052 
2053  std::unique_ptr<StringDictionaryClient> client;
2054  if (!string_dict_hosts_.empty()) {
2055  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
2056  }
2057  if (client) {
2058  client->drop(dictRef);
2059  }
2060 
2061  dictDescriptorMapByRef_.erase(dictRef);
2062 }
2063 
2065  std::map<int, StringDictionary*>& stringDicts) {
2066  // learn 'committed' ColumnDescriptor of this column
2067  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2068  CHECK(cit != columnDescriptorMap_.end());
2069  auto& ccd = *cit->second;
2070 
2071  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2072  return;
2073  }
2074  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
2075  return;
2076  }
2077  if (!(ccd.columnType.get_comp_param() > 0)) {
2078  return;
2079  }
2080 
2081  auto dictId = ccd.columnType.get_comp_param();
2082  getMetadataForDict(dictId);
2083 
2084  const DictRef dictRef(currentDB_.dbId, dictId);
2085  auto dit = dictDescriptorMapByRef_.find(dictRef);
2086  CHECK(dit != dictDescriptorMapByRef_.end());
2087  CHECK(dit->second);
2088  CHECK(dit->second.get()->stringDict);
2089  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2090 }
2091 
2093  // caller must handle sqlite/chunk transaction TOGETHER
2094  cd.tableId = td.tableId;
2095  if (td.nShards > 0 && td.shard < 0) {
2096  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2097  auto shard_cd = cd;
2098  addColumn(*shard, shard_cd);
2099  }
2100  }
2102  addDictionary(cd);
2103  }
2104 
2105  using BindType = SqliteConnector::BindType;
2106  std::vector<BindType> types(17, BindType::TEXT);
2107  if (!cd.default_value.has_value()) {
2108  types[16] = BindType::NULL_TYPE;
2109  }
2111  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2112  "colscale, is_notnull, "
2113  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2114  "is_deletedcol, default_value) "
2115  "VALUES (?, "
2116  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2117  "?, ?, ?, "
2118  "?, "
2119  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2120  std::vector<std::string>{std::to_string(td.tableId),
2121  std::to_string(td.tableId),
2122  cd.columnName,
2131  "",
2134  cd.virtualExpr,
2136  cd.default_value.value_or("NULL")},
2137  types);
2138 
2140  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2141  std::vector<std::string>{std::to_string(td.tableId)});
2142 
2144  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2145  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2146  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2147 
2148  ++tableDescriptorMapById_[td.tableId]->nColumns;
2149  auto ncd = new ColumnDescriptor(cd);
2150  addToColumnMap(ncd);
2151  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2152 }
2153 
2155  {
2156  cat_write_lock write_lock(this);
2158  // caller must handle sqlite/chunk transaction TOGETHER
2160  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2161  std::vector<std::string>{std::to_string(td.tableId),
2162  std::to_string(cd.columnId)});
2163 
2165  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2166  std::vector<std::string>{std::to_string(td.tableId)});
2167 
2168  ColumnDescriptorMap::iterator columnDescIt =
2170  CHECK(columnDescIt != columnDescriptorMap_.end());
2171 
2172  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2173  removeFromColumnMap(columnDescIt->second);
2174  --tableDescriptorMapById_[td.tableId]->nColumns;
2175  }
2176 
2177  // for each shard
2178  if (td.nShards > 0 && td.shard < 0) {
2179  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2180  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2181  CHECK(shard_cd);
2182  dropColumn(*shard, *shard_cd);
2183  }
2184  }
2185 }
2186 
2187 void Catalog::roll(const bool forward) {
2188  cat_write_lock write_lock(this);
2189  std::set<const TableDescriptor*> tds;
2190 
2191  for (const auto& cdr : columnDescriptorsForRoll) {
2192  auto ocd = cdr.first;
2193  auto ncd = cdr.second;
2194  CHECK(ocd || ncd);
2195  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2196  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2197  auto td = tabDescIt->second;
2198  auto& vc = td->columnIdBySpi_;
2199  if (forward) {
2200  if (ocd) {
2201  if (nullptr == ncd ||
2202  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2203  delDictionary(*ocd);
2204  }
2205 
2206  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2207 
2208  delete ocd;
2209  }
2210  if (ncd) {
2211  // append columnId if its new and not phy geo
2212  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2213  if (!ncd->isGeoPhyCol) {
2214  vc.push_back(ncd->columnId);
2215  }
2216  }
2217  }
2218  tds.insert(td);
2219  } else {
2220  if (ocd) {
2221  addToColumnMap(ocd);
2222  }
2223  // roll back the dict of new column
2224  if (ncd) {
2225  removeFromColumnMap(ncd);
2226  if (nullptr == ocd ||
2227  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2228  delDictionary(*ncd);
2229  }
2230  delete ncd;
2231  }
2232  }
2233  }
2234  columnDescriptorsForRoll.clear();
2235 
2236  if (forward) {
2237  for (const auto td : tds) {
2238  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2239  }
2240  }
2241 }
2242 
2244  list<ColumnDescriptor>& columns) {
2245  const auto& col_ti = cd.columnType;
2246  if (IS_GEO(col_ti.get_type())) {
2247  switch (col_ti.get_type()) {
2248  case kPOINT: {
2249  ColumnDescriptor physical_cd_coords(true);
2250  physical_cd_coords.columnName = cd.columnName + "_coords";
2251  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2252  // Raw data: compressed/uncompressed coords
2253  coords_ti.set_subtype(kTINYINT);
2254  size_t unit_size;
2255  if (col_ti.get_compression() == kENCODING_GEOINT &&
2256  col_ti.get_comp_param() == 32) {
2257  unit_size = 4 * sizeof(int8_t);
2258  } else {
2259  CHECK(col_ti.get_compression() == kENCODING_NONE);
2260  unit_size = 8 * sizeof(int8_t);
2261  }
2262  coords_ti.set_size(2 * unit_size);
2263  physical_cd_coords.columnType = coords_ti;
2264  columns.push_back(physical_cd_coords);
2265 
2266  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2267 
2268  break;
2269  }
2270  case kLINESTRING: {
2271  ColumnDescriptor physical_cd_coords(true);
2272  physical_cd_coords.columnName = cd.columnName + "_coords";
2273  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2274  // Raw data: compressed/uncompressed coords
2275  coords_ti.set_subtype(kTINYINT);
2276  physical_cd_coords.columnType = coords_ti;
2277  columns.push_back(physical_cd_coords);
2278 
2279  ColumnDescriptor physical_cd_bounds(true);
2280  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2281  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2282  bounds_ti.set_subtype(kDOUBLE);
2283  bounds_ti.set_size(4 * sizeof(double));
2284  physical_cd_bounds.columnType = bounds_ti;
2285  columns.push_back(physical_cd_bounds);
2286 
2287  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2288 
2289  break;
2290  }
2291  case kPOLYGON: {
2292  ColumnDescriptor physical_cd_coords(true);
2293  physical_cd_coords.columnName = cd.columnName + "_coords";
2294  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2295  // Raw data: compressed/uncompressed coords
2296  coords_ti.set_subtype(kTINYINT);
2297  physical_cd_coords.columnType = coords_ti;
2298  columns.push_back(physical_cd_coords);
2299 
2300  ColumnDescriptor physical_cd_ring_sizes(true);
2301  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2302  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2303  ring_sizes_ti.set_subtype(kINT);
2304  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2305  columns.push_back(physical_cd_ring_sizes);
2306 
2307  ColumnDescriptor physical_cd_bounds(true);
2308  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2309  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2310  bounds_ti.set_subtype(kDOUBLE);
2311  bounds_ti.set_size(4 * sizeof(double));
2312  physical_cd_bounds.columnType = bounds_ti;
2313  columns.push_back(physical_cd_bounds);
2314 
2315  ColumnDescriptor physical_cd_render_group(true);
2316  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2317  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2318  physical_cd_render_group.columnType = render_group_ti;
2319  columns.push_back(physical_cd_render_group);
2320 
2321  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2322 
2323  break;
2324  }
2325  case kMULTIPOLYGON: {
2326  ColumnDescriptor physical_cd_coords(true);
2327  physical_cd_coords.columnName = cd.columnName + "_coords";
2328  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2329  // Raw data: compressed/uncompressed coords
2330  coords_ti.set_subtype(kTINYINT);
2331  physical_cd_coords.columnType = coords_ti;
2332  columns.push_back(physical_cd_coords);
2333 
2334  ColumnDescriptor physical_cd_ring_sizes(true);
2335  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2336  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2337  ring_sizes_ti.set_subtype(kINT);
2338  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2339  columns.push_back(physical_cd_ring_sizes);
2340 
2341  ColumnDescriptor physical_cd_poly_rings(true);
2342  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2343  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2344  poly_rings_ti.set_subtype(kINT);
2345  physical_cd_poly_rings.columnType = poly_rings_ti;
2346  columns.push_back(physical_cd_poly_rings);
2347 
2348  ColumnDescriptor physical_cd_bounds(true);
2349  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2350  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2351  bounds_ti.set_subtype(kDOUBLE);
2352  bounds_ti.set_size(4 * sizeof(double));
2353  physical_cd_bounds.columnType = bounds_ti;
2354  columns.push_back(physical_cd_bounds);
2355 
2356  ColumnDescriptor physical_cd_render_group(true);
2357  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2358  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2359  physical_cd_render_group.columnType = render_group_ti;
2360  columns.push_back(physical_cd_render_group);
2361 
2362  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2363 
2364  break;
2365  }
2366  default:
2367  throw runtime_error("Unrecognized geometry type.");
2368  break;
2369  }
2370  }
2371 }
2372 
2373 namespace {
2375  auto timing_type_entry =
2377  CHECK(timing_type_entry != foreign_table.options.end());
2378  if (timing_type_entry->second ==
2381  foreign_table.options);
2382  }
2384 }
2385 } // namespace
2386 
2388  TableDescriptor& td,
2389  const list<ColumnDescriptor>& cols,
2390  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2391  bool isLogicalTable) {
2392  cat_write_lock write_lock(this);
2393  list<ColumnDescriptor> cds = cols;
2394  list<DictDescriptor> dds;
2395  std::set<std::string> toplevel_column_names;
2396  list<ColumnDescriptor> columns;
2397 
2398  if (!td.storageType.empty() &&
2401  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2402  }
2403  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2404  }
2405 
2406  for (auto cd : cds) {
2407  if (cd.columnName == "rowid") {
2408  throw std::runtime_error(
2409  "Cannot create column with name rowid. rowid is a system defined column.");
2410  }
2411  columns.push_back(cd);
2412  toplevel_column_names.insert(cd.columnName);
2413  if (cd.columnType.is_geometry()) {
2414  expandGeoColumn(cd, columns);
2415  }
2416  }
2417  cds.clear();
2418 
2419  ColumnDescriptor cd;
2420  // add row_id column -- Must be last column in the table
2421  cd.columnName = "rowid";
2422  cd.isSystemCol = true;
2423  cd.columnType = SQLTypeInfo(kBIGINT, true);
2424 #ifdef MATERIALIZED_ROWID
2425  cd.isVirtualCol = false;
2426 #else
2427  cd.isVirtualCol = true;
2428  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2429 #endif
2430  columns.push_back(cd);
2431  toplevel_column_names.insert(cd.columnName);
2432 
2433  if (td.hasDeletedCol) {
2434  ColumnDescriptor cd_del;
2435  cd_del.columnName = "$deleted$";
2436  cd_del.isSystemCol = true;
2437  cd_del.isVirtualCol = false;
2438  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2439  cd_del.isDeletedCol = true;
2440 
2441  columns.push_back(cd_del);
2442  }
2443 
2444  td.nColumns = columns.size();
2446  sqliteConnector_.query("BEGIN TRANSACTION");
2448  try {
2450  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2451  std::vector<std::string>{td.tableName,
2452  std::to_string(td.userId),
2454  std::to_string(td.isView),
2455  "",
2460  std::to_string(td.maxRows),
2461  td.partitions,
2463  std::to_string(td.shard),
2464  std::to_string(td.nShards),
2466  td.storageType,
2469  td.keyMetainfo});
2470 
2471  // now get the auto generated tableid
2473  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2474  td.tableId = sqliteConnector_.getData<int>(0, 0);
2475  int colId = 1;
2476  for (auto cd : columns) {
2478  const bool is_foreign_col =
2479  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2480  if (!is_foreign_col) {
2481  // Ideally we would like to not persist string dictionaries for system tables,
2482  // since system table content can be highly dynamic and string dictionaries
2483  // are not currently vacuumed. However, in distributed this causes issues
2484  // when the cluster is out of sync (when the agg resets but leaves persist) so
2485  // for the sake of testing we need to leave this as normal dictionaries until
2486  // we solve the distributed issue.
2487  auto use_temp_dictionary = false; // td.is_system_table;
2488  setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
2489  }
2490  }
2491 
2492  if (toplevel_column_names.count(cd.columnName)) {
2493  if (!cd.isGeoPhyCol) {
2494  td.columnIdBySpi_.push_back(colId);
2495  }
2496  }
2497 
2498  using BindType = SqliteConnector::BindType;
2499  std::vector<BindType> types(17, BindType::TEXT);
2500  if (!cd.default_value.has_value()) {
2501  types[16] = BindType::NULL_TYPE;
2502  }
2504  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2505  "coldim, colscale, is_notnull, "
2506  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2507  "virtual_expr, is_deletedcol, default_value) "
2508  "VALUES (?, ?, ?, ?, ?, "
2509  "?, "
2510  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2511  std::vector<std::string>{std::to_string(td.tableId),
2512  std::to_string(colId),
2513  cd.columnName,
2522  "",
2525  cd.virtualExpr,
2527  cd.default_value.value_or("NULL")},
2528  types);
2529  cd.tableId = td.tableId;
2530  cd.columnId = colId++;
2531  cds.push_back(cd);
2532  }
2533  if (td.isView) {
2535  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2536  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2537  }
2539  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2540  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2542  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2543  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2544  std::vector<std::string>{std::to_string(foreign_table.tableId),
2545  std::to_string(foreign_table.foreign_server->id),
2546  foreign_table.getOptionsAsJsonString(),
2547  std::to_string(foreign_table.last_refresh_time),
2548  std::to_string(foreign_table.next_refresh_time)});
2549  }
2550  } catch (std::exception& e) {
2551  sqliteConnector_.query("ROLLBACK TRANSACTION");
2552  throw;
2553  }
2554  } else { // Temporary table
2555  td.tableId = nextTempTableId_++;
2556  int colId = 1;
2557  for (auto cd : columns) {
2559  const bool is_foreign_col =
2560  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2561 
2562  if (!is_foreign_col) {
2563  // Create a new temporary dictionary
2564  std::string fileName("");
2565  std::string folderPath("");
2567  nextTempDictId_++;
2568  DictDescriptor dd(dict_ref,
2569  fileName,
2571  false,
2572  1,
2573  folderPath,
2574  true); // Is dictName (2nd argument) used?
2575  dds.push_back(dd);
2576  if (!cd.columnType.is_array()) {
2578  }
2579  cd.columnType.set_comp_param(dict_ref.dictId);
2580  }
2581  }
2582  if (toplevel_column_names.count(cd.columnName)) {
2583  if (!cd.isGeoPhyCol) {
2584  td.columnIdBySpi_.push_back(colId);
2585  }
2586  }
2587  cd.tableId = td.tableId;
2588  cd.columnId = colId++;
2589  cds.push_back(cd);
2590  }
2591 
2593  serializeTableJsonUnlocked(&td, cds);
2594  }
2595  }
2596 
2597  try {
2598  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2599  if (cache) {
2600  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2601  << "Disk cache at " + cache->getCacheDirectory()
2602  << " contains preexisting data for new table. Please "
2603  "delete or clear cache before continuing";
2604  }
2605 
2606  addTableToMap(&td, cds, dds);
2607  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2608  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2609  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2610  }
2611  } catch (std::exception& e) {
2612  sqliteConnector_.query("ROLLBACK TRANSACTION");
2613  removeTableFromMap(td.tableName, td.tableId, true);
2614  throw;
2615  }
2616  sqliteConnector_.query("END TRANSACTION");
2617 
2618  if (td.storageType != StorageType::FOREIGN_TABLE) {
2619  write_lock.unlock();
2620  sqlite_lock.unlock();
2621  getMetadataForTable(td.tableName,
2622  true); // cause instantiateFragmenter() to be called
2623  }
2624 }
2625 
2626 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2627  const std::list<ColumnDescriptor>& cds) const {
2628  // relies on the catalog write lock
2629  using namespace rapidjson;
2630 
2631  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2632 
2633  const auto db_name = currentDB_.dbName;
2634  const auto file_path = table_json_filepath(basePath_, db_name);
2635 
2636  Document d;
2637  if (boost::filesystem::exists(file_path)) {
2638  // look for an existing file for this database
2639  std::ifstream reader(file_path.string());
2640  CHECK(reader.is_open());
2641  IStreamWrapper json_read_wrapper(reader);
2642  d.ParseStream(json_read_wrapper);
2643  } else {
2644  d.SetObject();
2645  }
2646  CHECK(d.IsObject());
2647  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2648 
2649  Value table(kObjectType);
2650  table.AddMember(
2651  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2652  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2653  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2654 
2655  for (const auto& cd : cds) {
2656  Value column(kObjectType);
2657  column.AddMember(
2658  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2659  column.AddMember("coltype",
2660  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2661  d.GetAllocator());
2662  column.AddMember("colsubtype",
2663  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2664  d.GetAllocator());
2665  column.AddMember(
2666  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2667  column.AddMember(
2668  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2669  column.AddMember(
2670  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2671  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2672  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2673  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2674  table["columns"].PushBack(column, d.GetAllocator());
2675  }
2676  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2677 
2678  // Overwrite the existing file
2679  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2680  CHECK(writer.is_open());
2681  OStreamWrapper json_wrapper(writer);
2682 
2683  Writer<OStreamWrapper> json_writer(json_wrapper);
2684  d.Accept(json_writer);
2685  writer.close();
2686 }
2687 
2688 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2689  // relies on the catalog write lock
2690  using namespace rapidjson;
2691 
2692  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2693 
2694  const auto db_name = currentDB_.dbName;
2695  const auto file_path = table_json_filepath(basePath_, db_name);
2696 
2697  CHECK(boost::filesystem::exists(file_path));
2698  Document d;
2699 
2700  std::ifstream reader(file_path.string());
2701  CHECK(reader.is_open());
2702  IStreamWrapper json_read_wrapper(reader);
2703  d.ParseStream(json_read_wrapper);
2704 
2705  CHECK(d.IsObject());
2706  auto table_name_ref = StringRef(table_name.c_str());
2707  CHECK(d.HasMember(table_name_ref));
2708  CHECK(d.RemoveMember(table_name_ref));
2709 
2710  // Overwrite the existing file
2711  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2712  CHECK(writer.is_open());
2713  OStreamWrapper json_wrapper(writer);
2714 
2715  Writer<OStreamWrapper> json_writer(json_wrapper);
2716  d.Accept(json_writer);
2717  writer.close();
2718 }
2719 
2720 void Catalog::createForeignServer(
2721  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2722  bool if_not_exists) {
2723  cat_write_lock write_lock(this);
2724  cat_sqlite_lock sqlite_lock(getObjForLock());
2725  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
2726 }
2727 
2728 void Catalog::createForeignServerNoLocks(
2729  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2730  bool if_not_exists) {
2731  const auto& name = foreign_server->name;
2732 
2733  sqliteConnector_.query_with_text_params(
2734  "SELECT name from omnisci_foreign_servers where name = ?",
2735  std::vector<std::string>{name});
2736 
2737  if (sqliteConnector_.getNumRows() == 0) {
2738  foreign_server->creation_time = std::time(nullptr);
2739  sqliteConnector_.query_with_text_params(
2740  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
2741  "creation_time, "
2742  "options) "
2743  "VALUES (?, ?, ?, ?, ?)",
2744  std::vector<std::string>{name,
2745  foreign_server->data_wrapper_type,
2746  std::to_string(foreign_server->user_id),
2747  std::to_string(foreign_server->creation_time),
2748  foreign_server->getOptionsAsJsonString()});
2749  sqliteConnector_.query_with_text_params(
2750  "SELECT id from omnisci_foreign_servers where name = ?",
2751  std::vector<std::string>{name});
2752  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
2753  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
2754  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
2755  std::move(foreign_server);
2756  CHECK(foreignServerMap_.find(name) == foreignServerMap_.end())
2757  << "Attempting to insert a foreign server into foreign server map that already "
2758  "exists.";
2759  foreignServerMap_[name] = foreign_server_shared;
2760  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
2761  } else if (!if_not_exists) {
2762  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
2763  "\" already exists."};
2764  }
2765 
2766  const auto& server_it = foreignServerMap_.find(name);
2767  CHECK(server_it != foreignServerMap_.end());
2768  CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
2769 }
2770 
2771 const foreign_storage::ForeignServer* Catalog::getForeignServer(
2772  const std::string& server_name) const {
2773  foreign_storage::ForeignServer* foreign_server = nullptr;
2774  cat_read_lock read_lock(this);
2775 
2776  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
2777  foreign_server = foreignServerMap_.find(server_name)->second.get();
2778  }
2779  return foreign_server;
2780 }
2781 
2782 const std::unique_ptr<const foreign_storage::ForeignServer>
2783 Catalog::getForeignServerFromStorage(const std::string& server_name) {
2784  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
2785  cat_sqlite_lock sqlite_lock(getObjForLock());
2786  sqliteConnector_.query_with_text_params(
2787  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
2788  "FROM omnisci_foreign_servers WHERE name = ?",
2789  std::vector<std::string>{server_name});
2790  if (sqliteConnector_.getNumRows() > 0) {
2791  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
2792  sqliteConnector_.getData<int>(0, 0),
2793  sqliteConnector_.getData<std::string>(0, 1),
2794  sqliteConnector_.getData<std::string>(0, 2),
2795  sqliteConnector_.getData<std::string>(0, 3),
2796  sqliteConnector_.getData<std::int32_t>(0, 4),
2797  sqliteConnector_.getData<std::int32_t>(0, 5));
2798  }
2799  return foreign_server;
2800 }
2801 
2802 const std::unique_ptr<const foreign_storage::ForeignTable>
2803 Catalog::getForeignTableFromStorage(int table_id) {
2804  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
2805  cat_sqlite_lock sqlite_lock(getObjForLock());
2806  sqliteConnector_.query_with_text_params(
2807  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
2808  "omnisci_foreign_tables WHERE table_id = ?",
2809  std::vector<std::string>{to_string(table_id)});
2810  auto num_rows = sqliteConnector_.getNumRows();
2811  if (num_rows > 0) {
2812  CHECK_EQ(size_t(1), num_rows);
2813  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
2814  sqliteConnector_.getData<int>(0, 0),
2815  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
2816  sqliteConnector_.getData<std::string>(0, 2),
2817  sqliteConnector_.getData<int>(0, 3),
2818  sqliteConnector_.getData<int>(0, 4));
2819  }
2820  return foreign_table;
2821 }
2822 
2823 void Catalog::changeForeignServerOwner(const std::string& server_name,
2824  const int new_owner_id) {
2825  cat_write_lock write_lock(this);
2826  foreign_storage::ForeignServer* foreign_server =
2827  foreignServerMap_.find(server_name)->second.get();
2828  CHECK(foreign_server);
2829  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
2830  // update in-memory server
2831  foreign_server->user_id = new_owner_id;
2832 }
2833 
2834 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
2835  const std::string& data_wrapper) {
2836  cat_write_lock write_lock(this);
2837  auto data_wrapper_type = to_upper(data_wrapper);
2838  // update in-memory server
2839  foreign_storage::ForeignServer* foreign_server =
2840  foreignServerMap_.find(server_name)->second.get();
2841  CHECK(foreign_server);
2842  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
2843  foreign_server->data_wrapper_type = data_wrapper_type;
2844  try {
2845  foreign_server->validate();
2846  } catch (const std::exception& e) {
2847  // validation did not succeed:
2848  // revert to saved data_wrapper_type & throw exception
2849  foreign_server->data_wrapper_type = saved_data_wrapper_type;
2850  throw;
2851  }
2852  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
2853 }
2854 
2855 void Catalog::setForeignServerOptions(const std::string& server_name,
2856  const std::string& options) {
2857  cat_write_lock write_lock(this);
2858  // update in-memory server
2859  foreign_storage::ForeignServer* foreign_server =
2860  foreignServerMap_.find(server_name)->second.get();
2861  CHECK(foreign_server);
2862  auto saved_options = foreign_server->options;
2863  foreign_server->populateOptionsMap(options, true);
2864  try {
2865  foreign_server->validate();
2866  } catch (const std::exception& e) {
2867  // validation did not succeed:
2868  // revert to saved options & throw exception
2869  foreign_server->options = saved_options;
2870  throw;
2871  }
2872  setForeignServerProperty(server_name, "options", options);
2873 }
2874 
2875 void Catalog::renameForeignServer(const std::string& server_name,
2876  const std::string& name) {
2877  cat_write_lock write_lock(this);
2878  auto foreign_server_it = foreignServerMap_.find(server_name);
2879  CHECK(foreign_server_it != foreignServerMap_.end());
2880  setForeignServerProperty(server_name, "name", name);
2881  auto foreign_server_shared = foreign_server_it->second;
2882  foreign_server_shared->name = name;
2883  foreignServerMap_[name] = foreign_server_shared;
2884  foreignServerMap_.erase(foreign_server_it);
2885 }
2886 
2887 void Catalog::dropForeignServer(const std::string& server_name) {
2888  cat_write_lock write_lock(this);
2889  cat_sqlite_lock sqlite_lock(getObjForLock());
2890 
2891  sqliteConnector_.query_with_text_params(
2892  "SELECT id from omnisci_foreign_servers where name = ?",
2893  std::vector<std::string>{server_name});
2894  auto num_rows = sqliteConnector_.getNumRows();
2895  if (num_rows > 0) {
2896  CHECK_EQ(size_t(1), num_rows);
2897  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
2898  sqliteConnector_.query_with_text_param(
2899  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
2900  std::to_string(server_id));
2901  if (sqliteConnector_.getNumRows() > 0) {
2902  throw std::runtime_error{"Foreign server \"" + server_name +
2903  "\" is referenced "
2904  "by existing foreign tables and cannot be dropped."};
2905  }
2906  sqliteConnector_.query("BEGIN TRANSACTION");
2907  try {
2908  sqliteConnector_.query_with_text_params(
2909  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
2910  std::vector<std::string>{server_name});
2911  } catch (const std::exception& e) {
2912  sqliteConnector_.query("ROLLBACK TRANSACTION");
2913  throw;
2914  }
2915  sqliteConnector_.query("END TRANSACTION");
2916  foreignServerMap_.erase(server_name);
2917  foreignServerMapById_.erase(server_id);
2918  }
2919 }
2920 
2921 void Catalog::getForeignServersForUser(
2922  const rapidjson::Value* filters,
2923  const UserMetadata& user,
2924  std::vector<const foreign_storage::ForeignServer*>& results) {
2925  sys_read_lock syscat_read_lock(&SysCatalog::instance());
2926  cat_read_lock read_lock(this);
2927  cat_sqlite_lock sqlite_lock(getObjForLock());
2928  // Customer facing and internal SQlite names
2929  std::map<std::string, std::string> col_names{{"server_name", "name"},
2930  {"data_wrapper", "data_wrapper_type"},
2931  {"created_at", "creation_time"},
2932  {"options", "options"}};
2933 
2934  // TODO add "owner" when FSI privilege is implemented
2935  std::stringstream filter_string;
2936  std::vector<std::string> arguments;
2937 
2938  if (filters != nullptr) {
2939  // Create SQL WHERE clause for SQLite query
2940  int num_filters = 0;
2941  filter_string << " WHERE";
2942  for (auto& filter_def : filters->GetArray()) {
2943  if (num_filters > 0) {
2944  filter_string << " " << std::string(filter_def["chain"].GetString());
2945  ;
2946  }
2947 
2948  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
2949  col_names.end()) {
2950  throw std::runtime_error{"Attribute with name \"" +
2951  std::string(filter_def["attribute"].GetString()) +
2952  "\" does not exist."};
2953  }
2954 
2955  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
2956 
2957  bool equals_operator = false;
2958  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
2959  filter_string << " = ? ";
2960  equals_operator = true;
2961  } else {
2962  filter_string << " LIKE ? ";
2963  }
2964 
2965  bool timestamp_column =
2966  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
2967 
2968  if (timestamp_column && !equals_operator) {
2969  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
2970  }
2971 
2972  if (timestamp_column && equals_operator) {
2973  arguments.push_back(std::to_string(
2974  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
2975  } else {
2976  arguments.emplace_back(filter_def["value"].GetString());
2977  }
2978 
2979  num_filters++;
2980  }
2981  }
2982  // Create select query for the omnisci_foreign_servers table
2983  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
2984  query += filter_string.str();
2985 
2986  sqliteConnector_.query_with_text_params(query, arguments);
2987  auto num_rows = sqliteConnector_.getNumRows();
2988 
2989  if (sqliteConnector_.getNumRows() == 0) {
2990  return;
2991  }
2992 
2993  CHECK(sqliteConnector_.getNumCols() == 1);
2994  // Return pointers to objects
2995  results.reserve(num_rows);
2996  for (size_t row = 0; row < num_rows; ++row) {
2997  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
2998  if (shared::contains(INTERNAL_SERVERS, server_name)) {
2999  continue;
3000  }
3001  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
3002  CHECK(foreign_server != nullptr);
3003 
3004  DBObject dbObject(foreign_server->name, ServerDBObjectType);
3005  dbObject.loadKey(*this);
3006  std::vector<DBObject> privObjects = {dbObject};
3007  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3008  // skip server, as there are no privileges to access it
3009  continue;
3010  }
3011  results.push_back(foreign_server);
3012  }
3013 }
3014 
3015 // returns the table epoch or -1 if there is something wrong with the shared epoch
3016 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
3017  cat_read_lock read_lock(this);
3018  const auto td = getMetadataForTable(table_id, false);
3019  if (!td) {
3020  std::stringstream table_not_found_error_message;
3021  table_not_found_error_message << "Table (" << db_id << "," << table_id
3022  << ") not found";
3023  throw std::runtime_error(table_not_found_error_message.str());
3024  }
3025  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3026  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3027  // check all shards have same checkpoint
3028  const auto physicalTables = physicalTableIt->second;
3029  CHECK(!physicalTables.empty());
3030  size_t curr_epoch{0}, first_epoch{0};
3031  int32_t first_table_id{0};
3032  bool are_epochs_inconsistent{false};
3033  for (size_t i = 0; i < physicalTables.size(); i++) {
3034  int32_t physical_tb_id = physicalTables[i];
3035  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3036  CHECK(phys_td);
3037 
3038  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3039  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3040  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
3041  if (i == 0) {
3042  first_epoch = curr_epoch;
3043  first_table_id = physical_tb_id;
3044  } else if (first_epoch != curr_epoch) {
3045  are_epochs_inconsistent = true;
3046  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
3047  << ", db id: " << db_id
3048  << ". First table (table id: " << first_table_id
3049  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
3050  << ", has inconsistent epoch: " << curr_epoch
3051  << ". See previous INFO logs for all epochs and their table ids.";
3052  }
3053  }
3054  if (are_epochs_inconsistent) {
3055  // oh dear the shards do not agree on the epoch for this table
3056  return -1;
3057  }
3058  return curr_epoch;
3059  } else {
3060  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3061  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3062  << ", epoch: " << epoch;
3063  return epoch;
3064  }
3065 }
3066 
3067 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
3068  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
3069  << " back to new epoch " << new_epoch;
3070  const auto td = getMetadataForTable(table_id, false);
3071  if (!td) {
3072  std::stringstream table_not_found_error_message;
3073  table_not_found_error_message << "Table (" << db_id << "," << table_id
3074  << ") not found";
3075  throw std::runtime_error(table_not_found_error_message.str());
3076  }
3077  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3078  std::stringstream is_temp_table_error_message;
3079  is_temp_table_error_message << "Cannot set epoch on temporary table";
3080  throw std::runtime_error(is_temp_table_error_message.str());
3081  }
3082 
3083  File_Namespace::FileMgrParams file_mgr_params;
3084  file_mgr_params.epoch = new_epoch;
3085  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3086 
3087  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3088  CHECK(!physical_tables.empty());
3089  for (const auto table : physical_tables) {
3090  auto table_id = table->tableId;
3091  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
3092  << " back to new epoch " << new_epoch;
3093  // Should have table lock from caller so safe to do this after, avoids
3094  // having to repopulate data on error
3095  removeChunks(table_id);
3096  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3097  }
3098 }
3099 
3100 void Catalog::alterPhysicalTableMetadata(
3101  const TableDescriptor* td,
3102  const TableDescriptorUpdateParams& table_update_params) {
3103  // Only called from parent alterTableParamMetadata, expect already to have catalog and
3104  // sqlite write locks
3105 
3106  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
3107 
3108  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
3109  CHECK(mutable_td);
3110  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
3111  sqliteConnector_.query_with_text_params(
3112  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3113  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
3114  std::to_string(td->tableId)});
3115  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
3116  }
3117 
3118  if (td->maxRows != table_update_params.max_rows) {
3119  sqliteConnector_.query_with_text_params(
3120  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3121  std::vector<std::string>{std::to_string(table_update_params.max_rows),
3122  std::to_string(td->tableId)});
3123  mutable_td->maxRows = table_update_params.max_rows;
3124  }
3125 }
3126 
3127 void Catalog::alterTableMetadata(const TableDescriptor* td,
3128  const TableDescriptorUpdateParams& table_update_params) {
3129  cat_write_lock write_lock(this);
3130  cat_sqlite_lock sqlite_lock(getObjForLock());
3131  sqliteConnector_.query("BEGIN TRANSACTION");
3132  try {
3133  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3134  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3135  const auto physical_tables = physical_table_it->second;
3136  CHECK(!physical_tables.empty());
3137  for (size_t i = 0; i < physical_tables.size(); i++) {
3138  int32_t physical_tb_id = physical_tables[i];
3139  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3140  CHECK(phys_td);
3141  alterPhysicalTableMetadata(phys_td, table_update_params);
3142  }
3143  }
3144  alterPhysicalTableMetadata(td, table_update_params);
3145  } catch (std::exception& e) {
3146  sqliteConnector_.query("ROLLBACK TRANSACTION");
3147  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3148  }
3149  sqliteConnector_.query("END TRANSACTION");
3150 }
3151 
3152 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3153  const int32_t max_rollback_epochs) {
3154  // Must be called from AlterTableParamStmt or other method that takes executor and
3155  // TableSchema locks
3156  if (max_rollback_epochs <= -1) {
3157  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3158  }
3159  const auto td = getMetadataForTable(
3160  table_id, false); // Deep copy as there will be gap between read and write locks
3161  CHECK(td); // Existence should have already been checked in
3162  // ParserNode::AlterTableParmStmt
3163  TableDescriptorUpdateParams table_update_params(td);
3164  table_update_params.max_rollback_epochs = max_rollback_epochs;
3165  if (table_update_params == td) { // Operator is overloaded to test for equality
3166  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3167  << " to existing value, skipping operation";
3168  return;
3169  }
3170  File_Namespace::FileMgrParams file_mgr_params;
3171  file_mgr_params.epoch = -1; // Use existing epoch
3172  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3173  setTableFileMgrParams(table_id, file_mgr_params);
3174  alterTableMetadata(td, table_update_params);
3175 }
3176 
3177 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3178  if (max_rows < 0) {
3179  throw std::runtime_error("Max rows cannot be a negative number.");
3180  }
3181  const auto td = getMetadataForTable(table_id);
3182  CHECK(td);
3183  TableDescriptorUpdateParams table_update_params(td);
3184  table_update_params.max_rows = max_rows;
3185  if (table_update_params == td) {
3186  LOG(INFO) << "Max rows value of " << max_rows
3187  << " is the same as the existing value. Skipping update.";
3188  return;
3189  }
3190  alterTableMetadata(td, table_update_params);
3191  CHECK(td->fragmenter);
3192  td->fragmenter->dropFragmentsToSize(max_rows);
3193 }
3194 
3195 // For testing purposes only
3196 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3197  cat_write_lock write_lock(this);
3198  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3199  CHECK(td_entry != tableDescriptorMap_.end());
3200  auto td = td_entry->second;
3201 
3202  std::vector<int> table_key{getCurrentDB().dbId, td->tableId};
3203  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3204 
3205  TableDescriptorUpdateParams table_update_params(td);
3206  table_update_params.max_rollback_epochs = -1;
3207  write_lock.unlock();
3208 
3209  alterTableMetadata(td, table_update_params);
3210  File_Namespace::FileMgrParams file_mgr_params;
3211  file_mgr_params.max_rollback_epochs = -1;
3212  setTableFileMgrParams(td->tableId, file_mgr_params);
3213 }
3214 
3215 void Catalog::setTableFileMgrParams(
3216  const int table_id,
3217  const File_Namespace::FileMgrParams& file_mgr_params) {
3218  // Expects parent to have write lock
3219  const auto td = getMetadataForTable(table_id, false);
3220  const auto db_id = this->getDatabaseId();
3221  if (!td) {
3222  std::stringstream table_not_found_error_message;
3223  table_not_found_error_message << "Table (" << db_id << "," << table_id
3224  << ") not found";
3225  throw std::runtime_error(table_not_found_error_message.str());
3226  }
3227  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3228  std::stringstream is_temp_table_error_message;
3229  is_temp_table_error_message << "Cannot set storage params on temporary table";
3230  throw std::runtime_error(is_temp_table_error_message.str());
3231  }
3232 
3233  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3234  CHECK(!physical_tables.empty());
3235  for (const auto table : physical_tables) {
3236  auto table_id = table->tableId;
3237  removeChunks(table_id);
3238  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3239  }
3240 }
3241 
3242 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3243  const int32_t table_id) const {
3244  cat_read_lock read_lock(this);
3245  std::vector<TableEpochInfo> table_epochs;
3246  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3247  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3248  const auto physical_tables = physical_table_it->second;
3249  CHECK(!physical_tables.empty());
3250 
3251  for (const auto physical_tb_id : physical_tables) {
3252  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3253  CHECK(phys_td);
3254 
3255  auto table_id = phys_td->tableId;
3256  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3257  table_epochs.emplace_back(table_id, epoch);
3258  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3259  << ", table id: " << table_id << ", epoch: " << epoch;
3260  }
3261  } else {
3262  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3263  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3264  << ", epoch: " << epoch;
3265  table_epochs.emplace_back(table_id, epoch);
3266  }
3267  return table_epochs;
3268 }
3269 
3270 void Catalog::setTableEpochs(const int32_t db_id,
3271  const std::vector<TableEpochInfo>& table_epochs) const {
3272  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3273  CHECK(td);
3274  File_Namespace::FileMgrParams file_mgr_params;
3275  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3276 
3277  for (const auto& table_epoch_info : table_epochs) {
3278  removeChunks(table_epoch_info.table_id);
3279  file_mgr_params.epoch = table_epoch_info.table_epoch;
3280  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3281  db_id, table_epoch_info.table_id, file_mgr_params);
3282  LOG(INFO) << "Set table epoch for db id: " << db_id
3283  << ", table id: " << table_epoch_info.table_id
3284  << ", back to epoch: " << table_epoch_info.table_epoch;
3285  }
3286 }
3287 
3288 namespace {
3289 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3290  std::string table_epochs_str{"["};
3291  bool first_entry{true};
3292  for (const auto& table_epoch : table_epochs) {
3293  if (first_entry) {
3294  first_entry = false;
3295  } else {
3296  table_epochs_str += ", ";
3297  }
3298  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3299  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3300  }
3301  table_epochs_str += "]";
3302  return table_epochs_str;
3303 }
3304 } // namespace
3305 
3306 void Catalog::setTableEpochsLogExceptions(
3307  const int32_t db_id,
3308  const std::vector<TableEpochInfo>& table_epochs) const {
3309  try {
3310  setTableEpochs(db_id, table_epochs);
3311  } catch (std::exception& e) {
3312  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3313  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3314  << ", Error: " << e.what();
3315  }
3316 }
3317 
3318 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3319  cat_read_lock read_lock(this);
3320  const auto it = deletedColumnPerTable_.find(td);
3321  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3322 }
3323 
3324 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3325  int delete_column_id) const {
3326  // check if there are rows deleted by examining the deletedColumn metadata
3327  CHECK(td);
3328  auto fragmenter = td->fragmenter;
3329  if (fragmenter) {
3330  return fragmenter->hasDeletedRows(delete_column_id);
3331  } else {
3332  return false;
3333  }
3334 }
3335 
3336 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3337  const TableDescriptor* td) const {
3338  std::vector<const TableDescriptor*> tds;
3339  const ColumnDescriptor* cd;
3340  {
3341  cat_read_lock read_lock(this);
3342 
3343  const auto it = deletedColumnPerTable_.find(td);
3344  // if not a table that supports delete return nullptr, nothing more to do
3345  if (it == deletedColumnPerTable_.end()) {
3346  return nullptr;
3347  }
3348  cd = it->second;
3349  tds = getPhysicalTablesDescriptors(td, false);
3350  }
3351  // individual tables are still protected by higher level locks
3352  for (auto tdd : tds) {
3353  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3354  return cd;
3355  }
3356  }
3357  // no deletes so far recorded in metadata
3358  return nullptr;
3359 }
3360 
3361 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3362  cat_write_lock write_lock(this);
3363  setDeletedColumnUnlocked(td, cd);
3364 }
3365 
3366 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3367  const ColumnDescriptor* cd) {
3368  cat_write_lock write_lock(this);
3369  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3370  CHECK(it_ok.second);
3371 }
3372 
3373 namespace {
3374 
3376  const Catalog& cat,
3377  const Parser::SharedDictionaryDef& shared_dict_def) {
3378  const auto& table_name = shared_dict_def.get_foreign_table();
3379  const auto td = cat.getMetadataForTable(table_name, false);
3380  CHECK(td);
3381  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3382  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3383 }
3384 
3385 } // namespace
3386 
3387 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3388  Parser::SharedDictionaryDef shared_dict_def,
3389  const bool persist_reference) {
3390  cat_write_lock write_lock(this);
3391  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3392  CHECK(foreign_ref_col);
3393  referencing_column.columnType = foreign_ref_col->columnType;
3394  const int dict_id = referencing_column.columnType.get_comp_param();
3395  const DictRef dict_ref(currentDB_.dbId, dict_id);
3396  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3397  CHECK(dictIt != dictDescriptorMapByRef_.end());
3398  const auto& dd = dictIt->second;
3399  CHECK_GE(dd->refcount, 1);
3400  ++dd->refcount;
3401  if (persist_reference) {
3402  cat_sqlite_lock sqlite_lock(getObjForLock());
3403  sqliteConnector_.query_with_text_params(
3404  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3405  {std::to_string(dict_id)});
3406  }
3407 }
3408 
3409 bool Catalog::setColumnSharedDictionary(
3410  ColumnDescriptor& cd,
3411  std::list<ColumnDescriptor>& cdd,
3412  std::list<DictDescriptor>& dds,
3413  const TableDescriptor td,
3414  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3415  cat_write_lock write_lock(this);
3416  cat_sqlite_lock sqlite_lock(getObjForLock());
3417 
3418  if (shared_dict_defs.empty()) {
3419  return false;
3420  }
3421  for (const auto& shared_dict_def : shared_dict_defs) {
3422  // check if the current column is a referencing column
3423  const auto& column = shared_dict_def.get_column();
3424  if (cd.columnName == column) {
3425  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3426  // Dictionaries are being shared in table to be created
3427  const auto& ref_column = shared_dict_def.get_foreign_column();
3428  auto colIt =
3429  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3430  return !ref_column.compare(it.columnName);
3431  });
3432  CHECK(colIt != cdd.end());
3433  cd.columnType = colIt->columnType;
3434 
3435  const int dict_id = colIt->columnType.get_comp_param();
3436  CHECK_GE(dict_id, 1);
3437  auto dictIt = std::find_if(
3438  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3439  return it.dictRef.dbId == this->currentDB_.dbId &&
3440  it.dictRef.dictId == dict_id;
3441  });
3442  if (dictIt != dds.end()) {
3443  // There exists dictionary definition of a dictionary column
3444  CHECK_GE(dictIt->refcount, 1);
3445  ++dictIt->refcount;
3446  if (!table_is_temporary(&td)) {
3447  // Persist reference count
3448  sqliteConnector_.query_with_text_params(
3449  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3450  {std::to_string(dict_id)});
3451  }
3452  } else {
3453  // The dictionary is referencing a column which is referencing a column in
3454  // diffrent table
3455  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3456  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3457  }
3458  } else {
3459  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3460  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3461  if (table_is_temporary(foreign_td)) {
3462  if (!table_is_temporary(&td)) {
3463  throw std::runtime_error(
3464  "Only temporary tables can share dictionaries with other temporary "
3465  "tables.");
3466  }
3467  addReferenceToForeignDict(cd, shared_dict_def, false);
3468  } else {
3469  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3470  }
3471  }
3472  return true;
3473  }
3474  }
3475  return false;
3476 }
3477 
3478 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3479  std::list<DictDescriptor>& dds,
3480  const TableDescriptor& td,
3481  bool is_logical_table,
3482  bool use_temp_dictionary) {
3483  cat_write_lock write_lock(this);
3484 
3485  std::string dictName{"Initial_key"};
3486  int dictId{0};
3487  std::string folderPath;
3488  if (is_logical_table) {
3489  cat_sqlite_lock sqlite_lock(getObjForLock());
3490 
3491  sqliteConnector_.query_with_text_params(
3492  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3493  "?, 1)",
3494  std::vector<std::string>{
3495  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3496  sqliteConnector_.query_with_text_param(
3497  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3498  dictId = sqliteConnector_.getData<int>(0, 0);
3499  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3500  sqliteConnector_.query_with_text_param(
3501  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3502  folderPath = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
3503  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
3504  }
3505  DictDescriptor dd(currentDB_.dbId,
3506  dictId,
3507  dictName,
3509  false,
3510  1,
3511  folderPath,
3512  use_temp_dictionary);
3513  dds.push_back(dd);
3514  if (!cd.columnType.is_array()) {
3516  }
3517  cd.columnType.set_comp_param(dictId);
3518 }
3519 
3520 void Catalog::createShardedTable(
3521  TableDescriptor& td,
3522  const list<ColumnDescriptor>& cols,
3523  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3524  /* create logical table */
3525  TableDescriptor* tdl = &td;
3526  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3527  int32_t logical_tb_id = tdl->tableId;
3528  std::string logical_table_name = tdl->tableName;
3529 
3530  /* create physical tables and link them to the logical table */
3531  std::vector<int32_t> physicalTables;
3532  for (int32_t i = 1; i <= td.nShards; i++) {
3533  TableDescriptor* tdp = &td;
3534  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3535  tdp->shard = i - 1;
3536  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3537  int32_t physical_tb_id = tdp->tableId;
3538 
3539  /* add physical table to the vector of physical tables */
3540  physicalTables.push_back(physical_tb_id);
3541  }
3542 
3543  if (!physicalTables.empty()) {
3544  cat_write_lock write_lock(this);
3545  /* add logical to physical tables correspondence to the map */
3546  const auto it_ok =
3547  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3548  CHECK(it_ok.second);
3549  /* update sqlite mapd_logical_to_physical in sqlite database */
3550  if (!table_is_temporary(&td)) {
3551  updateLogicalToPhysicalTableMap(logical_tb_id);
3552  }
3553  }
3554 }
3555 
3556 void Catalog::truncateTable(const TableDescriptor* td) {
3557  // truncate all corresponding physical tables
3558  const auto physical_tables = getPhysicalTablesDescriptors(td);
3559  for (const auto table : physical_tables) {
3560  doTruncateTable(table);
3561  }
3562 }
3563 
3564 void Catalog::doTruncateTable(const TableDescriptor* td) {
3565  // must destroy fragmenter before deleteChunks is called.
3566  removeFragmenterForTable(td->tableId);
3567 
3568  const int tableId = td->tableId;
3569  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3570  // assuming deleteChunksWithPrefix is atomic
3571  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3572  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3573 
3574  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3575 
3576  cat_write_lock write_lock(this);
3577  std::unique_ptr<StringDictionaryClient> client;
3578  if (SysCatalog::instance().isAggregator()) {
3579  CHECK(!string_dict_hosts_.empty());
3580  DictRef dict_ref(currentDB_.dbId, -1);
3581  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3582  }
3583  // clean up any dictionaries
3584  // delete all column descriptors for the table
3585  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3586  auto cd = columnDescriptor.second;
3587  if (cd->tableId != td->tableId) {
3588  continue;
3589  }
3590  const int dict_id = cd->columnType.get_comp_param();
3591  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3592  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3593  const DictRef dict_ref(currentDB_.dbId, dict_id);
3594  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3595  CHECK(dictIt != dictDescriptorMapByRef_.end());
3596  const auto& dd = dictIt->second;
3597  CHECK_GE(dd->refcount, 1);
3598  // if this is the only table using this dict reset the dict
3599  if (dd->refcount == 1) {
3600  // close the dictionary
3601  dd->stringDict.reset();
3602  File_Namespace::renameForDelete(dd->dictFolderPath);
3603  if (client) {
3604  client->drop(dd->dictRef);
3605  }
3606  if (!dd->dictIsTemp) {
3607  boost::filesystem::create_directory(dd->dictFolderPath);
3608  }
3609  }
3610 
3611  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3612  dd->dictName,
3613  dd->dictNBits,
3614  dd->dictIsShared,
3615  dd->refcount,
3616  dd->dictFolderPath,
3617  dd->dictIsTemp);
3618  dictDescriptorMapByRef_.erase(dictIt);
3619  // now create new Dict -- need to figure out what to do here for temp tables
3620  if (client) {
3621  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3622  }
3623  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3624  getMetadataForDict(new_dd->dictRef.dictId);
3625  }
3626  }
3627 }
3628 
3629 void Catalog::removeFragmenterForTable(const int table_id) const {
3630  cat_write_lock write_lock(this);
3631  auto td = getMetadataForTable(table_id, false);
3632  if (td->fragmenter != nullptr) {
3633  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3634  CHECK(tableDescIt != tableDescriptorMapById_.end());
3635  tableDescIt->second->fragmenter = nullptr;
3636  CHECK(td->fragmenter == nullptr);
3637  }
3638 }
3639 
3640 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3641 void Catalog::removeChunks(const int table_id) const {
3642  removeFragmenterForTable(table_id);
3643 
3644  // remove the chunks from in memory structures
3645  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3646 
3647  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3648  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3649 }
3650 
3651 void Catalog::dropTable(const TableDescriptor* td) {
3652  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
3654  std::vector<const TableDescriptor*> tables_to_drop;
3655  {
3656  cat_read_lock read_lock(this);
3657  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3658  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3659  // remove all corresponding physical tables if this is a logical table
3660  const auto physicalTables = physicalTableIt->second;
3661  CHECK(!physicalTables.empty());
3662  for (size_t i = 0; i < physicalTables.size(); i++) {
3663  int32_t physical_tb_id = physicalTables[i];
3664  const TableDescriptor* phys_td =
3665  getMutableMetadataForTableUnlocked(physical_tb_id);
3666  CHECK(phys_td);
3667  tables_to_drop.emplace_back(phys_td);
3668  }
3669  }
3670  tables_to_drop.emplace_back(td);
3671  }
3672 
3673  for (auto table : tables_to_drop) {
3674  eraseTablePhysicalData(table);
3675  }
3676  deleteTableCatalogMetadata(td, tables_to_drop);
3677 }
3678 
3679 void Catalog::deleteTableCatalogMetadata(
3680  const TableDescriptor* logical_table,
3681  const std::vector<const TableDescriptor*>& physical_tables) {
3682  cat_write_lock write_lock(this);
3683  cat_sqlite_lock sqlite_lock(getObjForLock());
3684  sqliteConnector_.query("BEGIN TRANSACTION");
3685  try {
3686  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
3687  sqliteConnector_.query_with_text_param(
3688  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
3689  std::to_string(logical_table->tableId));
3690  logicalToPhysicalTableMapById_.erase(logical_table->tableId);
3691  for (auto table : physical_tables) {
3692  eraseTableMetadata(table);
3693  }
3694  } catch (std::exception& e) {
3695  sqliteConnector_.query("ROLLBACK TRANSACTION");
3696  throw;
3697  }
3698  sqliteConnector_.query("END TRANSACTION");
3699 }
3700 
3701 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
3702  executeDropTableSqliteQueries(td);
3704  dropTableFromJsonUnlocked(td->tableName);
3705  }
3706  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3707  {
3708  INJECT_TIMER(removeTableFromMap_);
3709  removeTableFromMap(td->tableName, td->tableId);
3710  }
3711 }
3712 
3713 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
3714  const int tableId = td->tableId;
3715  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
3716  std::to_string(tableId));
3717  sqliteConnector_.query_with_text_params(
3718  "select comp_param from mapd_columns where compression = ? and tableid = ?",
3719  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3720  int numRows = sqliteConnector_.getNumRows();
3721  std::vector<int> dict_id_list;
3722  for (int r = 0; r < numRows; ++r) {
3723  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
3724  }
3725  for (auto dict_id : dict_id_list) {
3726  sqliteConnector_.query_with_text_params(
3727  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
3728  std::vector<std::string>{std::to_string(dict_id)});
3729  }
3730  sqliteConnector_.query_with_text_params(
3731  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
3732  "mapd_columns where compression = ? "
3733  "and tableid = ?) and refcount = 0",
3734  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3735  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
3736  std::to_string(tableId));
3737  if (td->isView) {
3738  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
3739  std::to_string(tableId));
3740  }
3742  sqliteConnector_.query_with_text_param(
3743  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
3744  }
3745 }
3746 
3747 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
3748  cat_write_lock write_lock(this);
3749  cat_sqlite_lock sqlite_lock(getObjForLock());
3750 
3751  sqliteConnector_.query("BEGIN TRANSACTION");
3752  try {
3753  sqliteConnector_.query_with_text_params(
3754  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3755  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
3756  } catch (std::exception& e) {
3757  sqliteConnector_.query("ROLLBACK TRANSACTION");
3758  throw;
3759  }
3760  sqliteConnector_.query("END TRANSACTION");
3761  TableDescriptorMap::iterator tableDescIt =
3762  tableDescriptorMap_.find(to_upper(td->tableName));
3763  CHECK(tableDescIt != tableDescriptorMap_.end());
3764  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3765  // Get table descriptor to change it
3766  TableDescriptor* changeTd = tableDescIt->second;
3767  changeTd->tableName = newTableName;
3768  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3769  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3770  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3771 }
3772 
3773 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
3774  {
3775  cat_write_lock write_lock(this);
3776  cat_sqlite_lock sqlite_lock(getObjForLock());
3777  // rename all corresponding physical tables if this is a logical table
3778  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3779  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3780  const auto physicalTables = physicalTableIt->second;
3781  CHECK(!physicalTables.empty());
3782  for (size_t i = 0; i < physicalTables.size(); i++) {
3783  int32_t physical_tb_id = physicalTables[i];
3784  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3785  CHECK(phys_td);
3786  std::string newPhysTableName =
3787  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
3788  renamePhysicalTable(phys_td, newPhysTableName);
3789  }
3790  }
3791  renamePhysicalTable(td, newTableName);
3792  }
3793  {
3794  DBObject object(newTableName, TableDBObjectType);
3795  // update table name in direct and effective priv map
3796  DBObjectKey key;
3797  key.dbId = currentDB_.dbId;
3798  key.objectId = td->tableId;
3799  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3800  object.setObjectKey(key);
3801  auto objdescs = SysCatalog::instance().getMetadataForObject(
3802  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
3803  for (auto obj : objdescs) {
3804  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3805  if (grnt) {
3806  grnt->renameDbObject(object);
3807  }
3808  }
3809  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3810  }
3811 }
3812 
3813 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
3814  std::vector<int>& tableIds) {
3815  cat_write_lock write_lock(this);
3816  cat_sqlite_lock sqlite_lock(getObjForLock());
3817 
3818  // execute the SQL query
3819  try {
3820  for (size_t i = 0; i < names.size(); i++) {
3821  int tableId = tableIds[i];
3822  std::string& newTableName = names[i].second;
3823 
3824  sqliteConnector_.query_with_text_params(
3825  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3826  std::vector<std::string>{newTableName, std::to_string(tableId)});
3827  }
3828  } catch (std::exception& e) {
3829  sqliteConnector_.query("ROLLBACK TRANSACTION");
3830  throw;
3831  }
3832 
3833  // reset the table descriptors, give Calcite a kick
3834  for (size_t i = 0; i < names.size(); i++) {
3835  std::string& curTableName = names[i].first;
3836  std::string& newTableName = names[i].second;
3837 
3838  TableDescriptorMap::iterator tableDescIt =
3839  tableDescriptorMap_.find(to_upper(curTableName));
3840  CHECK(tableDescIt != tableDescriptorMap_.end());
3841  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3842 
3843  // Get table descriptor to change it
3844  TableDescriptor* changeTd = tableDescIt->second;
3845  changeTd->tableName = newTableName;
3846  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3847  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3848  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3849  }
3850 }
3851 
3852 // Collect an 'overlay' mapping of the tableNames->tableId
3853 // to account for possible chained renames
3854 // (for swap: a->b, b->c, c->d, d->a)
3855 
3857  std::map<std::string, int>& cachedTableMap,
3858  std::string& curTableName) {
3859  auto iter = cachedTableMap.find(curTableName);
3860  if ((iter != cachedTableMap.end())) {
3861  // get the cached tableId
3862  // and use that to lookup the TableDescriptor
3863  int tableId = (*iter).second;
3864  if (tableId == -1) {
3865  return NULL;
3866  } else {
3867  return cat->getMetadataForTable(tableId);
3868  }
3869  }
3870 
3871  // else ... lookup in standard location
3872  return cat->getMetadataForTable(curTableName);
3873 }
3874 
3875 void replaceTableName(std::map<std::string, int>& cachedTableMap,
3876  std::string& curTableName,
3877  std::string& newTableName,
3878  int tableId) {
3879  // mark old/cur name as deleted
3880  cachedTableMap[curTableName] = -1;
3881 
3882  // insert the 'new' name
3883  cachedTableMap[newTableName] = tableId;
3884 }
3885 
3886 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
3887  // tableId of all tables being renamed
3888  // ... in matching order to 'names'
3889  std::vector<int> tableIds;
3890 
3891  // (sorted & unique) list of tables ids for locking
3892  // (with names index of src in case of error)
3893  // <tableId, strIndex>
3894  // std::map is by definition/implementation sorted
3895  // std::map current usage below tests to avoid over-write
3896  std::map<int, size_t> uniqueOrderedTableIds;
3897 
3898  // mapping of modified tables names -> tableId
3899  std::map<std::string, int> cachedTableMap;
3900 
3901  // -------- Setup --------
3902 
3903  // gather tableIds pre-execute; build maps
3904  for (size_t i = 0; i < names.size(); i++) {
3905  std::string& curTableName = names[i].first;
3906  std::string& newTableName = names[i].second;
3907 
3908  // make sure the table being renamed exists,
3909  // or will exist when executed in 'name' order
3910  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
3911  CHECK(td);
3912 
3913  tableIds.push_back(td->tableId);
3914  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
3915  // don't overwrite as it should map to the first names index 'i'
3916  uniqueOrderedTableIds[td->tableId] = i;
3917  }
3918  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
3919  }
3920 
3921  CHECK_EQ(tableIds.size(), names.size());
3922 
3923  // The outer Stmt created a write lock before calling the catalog rename table
3924  // -> TODO: might want to sort out which really should set the lock :
3925  // the comment in the outer scope indicates it should be in here
3926  // but it's not clear if the access done there *requires* it out there
3927  //
3928  // Lock tables pre-execute (may/will be in different order than rename occurs)
3929  // const auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
3930  // *legacylockmgr::LockMgr<mapd_shared_mutex, bool>::getMutex(
3931  // legacylockmgr::ExecutorOuterLock, true));
3932 
3933  // acquire the locks for all tables being renamed
3935  for (auto& idPair : uniqueOrderedTableIds) {
3936  std::string& tableName = names[idPair.second].first;
3937  tableLocks.emplace_back(
3940  *this, tableName, false)));
3941  }
3942 
3943  // -------- Rename --------
3944 
3945  {
3946  cat_write_lock write_lock(this);
3947  cat_sqlite_lock sqlite_lock(getObjForLock());
3948 
3949  sqliteConnector_.query("BEGIN TRANSACTION");
3950 
3951  // collect all (tables + physical tables) into a single list
3952  std::vector<std::pair<std::string, std::string>> allNames;
3953  std::vector<int> allTableIds;
3954 
3955  for (size_t i = 0; i < names.size(); i++) {
3956  int tableId = tableIds[i];
3957  std::string& curTableName = names[i].first;
3958  std::string& newTableName = names[i].second;
3959 
3960  // rename all corresponding physical tables if this is a logical table
3961  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
3962  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3963  const auto physicalTables = physicalTableIt->second;
3964  CHECK(!physicalTables.empty());
3965  for (size_t k = 0; k < physicalTables.size(); k++) {
3966  int32_t physical_tb_id = physicalTables[k];
3967  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3968  CHECK(phys_td);
3969  std::string newPhysTableName =
3970  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
3971  allNames.emplace_back(phys_td->tableName, newPhysTableName);
3972  allTableIds.push_back(phys_td->tableId);
3973  }
3974  }
3975  allNames.emplace_back(curTableName, newTableName);
3976  allTableIds.push_back(tableId);
3977  }
3978  // rename all tables in one shot
3979  renamePhysicalTable(allNames, allTableIds);
3980 
3981  sqliteConnector_.query("END TRANSACTION");
3982  // cat write/sqlite locks are released when they go out scope
3983  }
3984  {
3985  // now update the SysCatalog
3986  for (size_t i = 0; i < names.size(); i++) {
3987  int tableId = tableIds[i];
3988  std::string& newTableName = names[i].second;
3989  {
3990  // update table name in direct and effective priv map
3991  DBObjectKey key;
3992  key.dbId = currentDB_.dbId;
3993  key.objectId = tableId;
3994  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3995 
3996  DBObject object(newTableName, TableDBObjectType);
3997  object.setObjectKey(key);
3998 
3999  auto objdescs = SysCatalog::instance().getMetadataForObject(
4000  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
4001  for (auto obj : objdescs) {
4002  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4003  if (grnt) {
4004  grnt->renameDbObject(object);
4005  }
4006  }
4007  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4008  }
4009  }
4010  }
4011 
4012  // -------- Cleanup --------
4013 
4014  // table locks are released when 'tableLocks' goes out of scope
4015 }
4016 
4017 void Catalog::renameColumn(const TableDescriptor* td,
4018  const ColumnDescriptor* cd,
4019  const string& newColumnName) {
4020  cat_write_lock write_lock(this);
4021  cat_sqlite_lock sqlite_lock(getObjForLock());
4022  sqliteConnector_.query("BEGIN TRANSACTION");
4023  try {
4024  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4025  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4026  CHECK(cdx);
4027  std::string new_column_name = cdx->columnName;
4028  new_column_name.replace(0, cd->columnName.size(), newColumnName);
4029  sqliteConnector_.query_with_text_params(
4030  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4031  std::vector<std::string>{new_column_name,
4032  std::to_string(td->tableId),
4033  std::to_string(cdx->columnId)});
4034  }
4035  } catch (std::exception& e) {
4036  sqliteConnector_.query("ROLLBACK TRANSACTION");
4037  throw;
4038  }
4039  sqliteConnector_.query("END TRANSACTION");
4040  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4041  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4042  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4043  CHECK(cdx);
4044  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4045  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
4046  CHECK(columnDescIt != columnDescriptorMap_.end());
4047  ColumnDescriptor* changeCd = columnDescIt->second;
4048  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
4049  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
4050  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
4051  changeCd;
4052  }
4053  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4054 }
4055 
4056 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
4057  cat_write_lock write_lock(this);
4058  cat_sqlite_lock sqlite_lock(getObjForLock());
4059  sqliteConnector_.query("BEGIN TRANSACTION");
4060  try {
4061  // TODO(andrew): this should be an upsert
4062  sqliteConnector_.query_with_text_params(
4063  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4064  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4065  if (sqliteConnector_.getNumRows() > 0) {
4066  sqliteConnector_.query_with_text_params(
4067  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4068  "update_time = "
4069  "datetime('now') where name = ? "
4070  "and userid = ?",
4071  std::vector<std::string>{vd.dashboardState,
4072  vd.imageHash,
4073  vd.dashboardMetadata,
4074  vd.dashboardName,
4075  std::to_string(vd.userId)});
4076  } else {
4077  sqliteConnector_.query_with_text_params(
4078  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4079  "update_time, "
4080  "userid) "
4081  "VALUES "
4082  "(?,?,?,?, "
4083  "datetime('now'), ?)",
4084  std::vector<std::string>{vd.dashboardName,
4085  vd.dashboardState,
4086  vd.imageHash,
4087  vd.dashboardMetadata,
4088  std::to_string(vd.userId)});
4089  }
4090  } catch (std::exception& e) {
4091  sqliteConnector_.query("ROLLBACK TRANSACTION");
4092  throw;
4093  }
4094  sqliteConnector_.query("END TRANSACTION");
4095 
4096  // now get the auto generated dashboardId
4097  try {
4098  sqliteConnector_.query_with_text_params(
4099  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4100  "WHERE name = ? and userid = ?",
4101  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4102  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
4103  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4104  } catch (std::exception& e) {
4105  throw;
4106  }
4108  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4109  addFrontendViewToMap(vd);
4110  sqlite_lock.unlock();
4111  write_lock.unlock();
4112  if (!isInfoSchemaDb()) {
4113  // NOTE(wamsi): Transactionally unsafe
4114  createOrUpdateDashboardSystemRole(
4116  }
4117  return vd.dashboardId;
4118 }
4119 
4120 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
4121  cat_write_lock write_lock(this);
4122  cat_sqlite_lock sqlite_lock(getObjForLock());
4123 
4124  CHECK(sqliteConnector_.getSqlitePtr());
4125  sqliteConnector_.query("BEGIN TRANSACTION");
4126  try {
4127  sqliteConnector_.query_with_text_params(
4128  "SELECT id FROM mapd_dashboards WHERE id = ?",
4129  std::vector<std::string>{std::to_string(vd.dashboardId)});
4130  if (sqliteConnector_.getNumRows() > 0) {
4131  sqliteConnector_.query_with_text_params(
4132  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4133  "?, userid = ?, update_time = datetime('now') where id = ? ",
4134  std::vector<std::string>{vd.dashboardName,
4135  vd.dashboardState,
4136  vd.imageHash,
4137  vd.dashboardMetadata,
4138  std::to_string(vd.userId),
4140  } else {
4141  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4142  << " does not exist in db";
4143  throw runtime_error("Error replacing dashboard id " +
4144  std::to_string(vd.dashboardId) + " does not exist in db");
4145  }
4146  } catch (std::exception& e) {
4147  sqliteConnector_.query("ROLLBACK TRANSACTION");
4148  throw;
4149  }
4150  sqliteConnector_.query("END TRANSACTION");
4151 
4152  bool found{false};
4153  for (auto descp : dashboardDescriptorMap_) {
4154  auto dash = descp.second.get();
4155  if (dash->dashboardId == vd.dashboardId) {
4156  found = true;
4157  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4158  dash->dashboardName);
4159  if (viewDescIt ==
4160  dashboardDescriptorMap_.end()) { // check to make sure view exists
4161  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4162  << " dashboard " << dash->dashboardName << " does not exist in map";
4163  throw runtime_error("No metadata for dashboard for user " +
4164  std::to_string(dash->userId) + " dashboard " +
4165  dash->dashboardName + " does not exist in map");
4166  }
4167  dashboardDescriptorMap_.erase(viewDescIt);
4168  break;
4169  }
4170  }
4171  if (!found) {
4172  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4173  << " does not exist in map";
4174  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4175  " does not exist in map");
4176  }
4177 
4178  // now reload the object
4179  sqliteConnector_.query_with_text_params(
4180  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4181  "mapd_dashboards "
4182  "WHERE id = ?",
4183  std::vector<std::string>{std::to_string(vd.dashboardId)});
4184  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4186  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4187  addFrontendViewToMapNoLock(vd);
4188  sqlite_lock.unlock();
4189  write_lock.unlock();
4190  if (!isInfoSchemaDb()) {
4191  // NOTE(wamsi): Transactionally unsafe
4192  createOrUpdateDashboardSystemRole(
4194  }
4195 }
4196 
4197 std::string Catalog::calculateSHA1(const std::string& data) {
4198  boost::uuids::detail::sha1 sha1;
4199  unsigned int digest[5];
4200  sha1.process_bytes(data.c_str(), data.length());
4201  sha1.get_digest(digest);
4202  std::stringstream ss;
4203  for (size_t i = 0; i < 5; i++) {
4204  ss << std::hex << digest[i];
4205  }
4206  return ss.str();
4207 }
4208 
4209 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4210  cat_write_lock write_lock(this);
4211  cat_sqlite_lock sqlite_lock(getObjForLock());
4212  sqliteConnector_.query("BEGIN TRANSACTION");
4213  try {
4214  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4215  .substr(0, 8);
4216  sqliteConnector_.query_with_text_params(
4217  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4218  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4219  if (sqliteConnector_.getNumRows() > 0) {
4220  sqliteConnector_.query_with_text_params(
4221  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4222  "link = ?",
4223  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4224  } else {
4225  sqliteConnector_.query_with_text_params(
4226  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4227  "update_time) VALUES (?,?,?,?, datetime('now'))",
4228  std::vector<std::string>{
4229  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4230  }
4231  // now get the auto generated dashid
4232  sqliteConnector_.query_with_text_param(
4233  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4234  "WHERE link = ?",
4235  ld.link);
4236  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4237  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4238  } catch (std::exception& e) {
4239  sqliteConnector_.query("ROLLBACK TRANSACTION");
4240  throw;
4241  }
4242  sqliteConnector_.query("END TRANSACTION");
4243  addLinkToMap(ld);
4244  return ld.link;
4245 }
4246 
4247 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4248  const TableDescriptor* td) const {
4249  cat_read_lock read_lock(this);
4250 
4251  const auto column_descriptors =
4252  getAllColumnMetadataForTable(td->tableId, false, true, true);
4253 
4254  const ColumnDescriptor* shard_cd{nullptr};
4255  int i = 1;
4256  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4257  ++cd_itr, ++i) {
4258  if (i == td->shardedColumnId) {
4259  shard_cd = *cd_itr;
4260  }
4261  }
4262  return shard_cd;
4263 }
4264 
4265 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4266  const TableDescriptor* logical_table_desc,
4267  bool populate_fragmenter) const {
4268  cat_read_lock read_lock(this);
4269  const auto physicalTableIt =
4270  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4271  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4272  return {logical_table_desc};
4273  }
4274  const auto physicalTablesIds = physicalTableIt->second;
4275  CHECK(!physicalTablesIds.empty());
4276  read_lock.unlock();
4277  std::vector<const TableDescriptor*> physicalTables;
4278  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4279  physicalTables.push_back(
4280  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4281  }
4282  return physicalTables;
4283 }
4284 
4285 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4286  const {
4287  cat_read_lock read_lock(this);
4288  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4289  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4290  for (const auto [table_id, td] : tableDescriptorMapById_) {
4291  // Only include ids for physical persisted tables
4292  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4293  logicalToPhysicalTableMapById_.find(table_id) ==
4294  logicalToPhysicalTableMapById_.end()) {
4295  table_and_shard_ids.emplace_back(table_id, td->shard);
4296  }
4297  }
4298  return table_and_shard_ids;
4299 }
4300 
4301 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4302  cat_read_lock read_lock(this);
4303 
4304  std::map<int, const ColumnDescriptor*> mapping;
4305 
4306  const auto tables = getAllTableMetadata();
4307  for (const auto td : tables) {
4308  if (td->shard >= 0) {
4309  // skip shards, they're not standalone tables
4310  continue;
4311  }
4312 
4313  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4314  const auto& ti = cd->columnType;
4315  if (ti.is_string()) {
4316  if (ti.get_compression() == kENCODING_DICT) {
4317  // if foreign reference, get referenced tab.col
4318  const auto dict_id = ti.get_comp_param();
4319 
4320  // ignore temp (negative) dictionaries
4321  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4322  mapping[dict_id] = cd;
4323  }
4324  }
4325  }
4326  }
4327  }
4328 
4329  return mapping;
4330 }
4331 
4332 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4333  const UserMetadata& user_metadata,
4334  const GetTablesType get_tables_type) const {
4335  if (td->shard >= 0) {
4336  // skip shards, they're not standalone tables
4337  return false;
4338  }
4339  switch (get_tables_type) {
4340  case GET_PHYSICAL_TABLES: {
4341  if (td->isView) {
4342  return false;
4343  }
4344  break;
4345  }
4346  case GET_VIEWS: {
4347  if (!td->isView) {
4348  return false;
4349  }
4350  break;
4351  }
4352  default:
4353  break;
4354  }
4356  dbObject.loadKey(*this);
4357  std::vector<DBObject> privObjects = {dbObject};
4358  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4359  // skip table, as there are no privileges to access it
4360  return false;
4361  }
4362  return true;
4363 }
4364 
4365 std::vector<std::string> Catalog::getTableNamesForUser(
4366  const UserMetadata& user_metadata,
4367  const GetTablesType get_tables_type) const {
4368  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4369  cat_read_lock read_lock(this);
4370  std::vector<std::string> table_names;
4371  const auto tables = getAllTableMetadata();
4372  for (const auto td : tables) {
4373  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4374  table_names.push_back(td->tableName);
4375  }
4376  }
4377  return table_names;
4378 }
4379 
4380 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4381  const UserMetadata& user_metadata,
4382  const GetTablesType get_tables_type,
4383  const std::string& filter_table_name) const {
4384  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4385  cat_read_lock read_lock(this);
4386 
4387  std::vector<TableMetadata> tables_metadata;
4388  const auto tables = getAllTableMetadata();
4389  for (const auto td : tables) {
4390  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4391  if (!filter_table_name.empty()) {
4392  if (td->tableName != filter_table_name) {
4393  continue;
4394  }
4395  }
4396  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4397  // descriptor outside catalog lock
4398  tables_metadata.emplace_back(table_metadata);
4399  }
4400  }
4401  return tables_metadata;
4402 }
4403 
4404 int Catalog::getLogicalTableId(const int physicalTableId) const {
4405  cat_read_lock read_lock(this);
4406  for (const auto& l : logicalToPhysicalTableMapById_) {
4407  if (l.second.end() != std::find_if(l.second.begin(),
4408  l.second.end(),
4409  [&](decltype(*l.second.begin()) tid) -> bool {
4410  return physicalTableId == tid;
4411  })) {
4412  return l.first;
4413  }
4414  }
4415  return physicalTableId;
4416 }
4417 
4418 void Catalog::checkpoint(const int logicalTableId) const {
4419  const auto td = getMetadataForTable(logicalTableId);
4420  const auto shards = getPhysicalTablesDescriptors(td);
4421  for (const auto shard : shards) {
4422  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4423  }
4424 }
4425 
4426 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4427  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4428  try {
4429  checkpoint(logical_table_id);
4430  } catch (...) {
4431  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4432  throw;
4433  }
4434 }
4435 
4436 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
4437  cat_read_lock read_lock(this);
4438  const auto td = getMetadataForTable(logicalTableId, false);
4439  const auto shards = getPhysicalTablesDescriptors(td, false);
4440  for (const auto shard : shards) {
4441  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
4442  }
4443 }
4444 
4445 void Catalog::eraseDbMetadata() {
4446  const auto tables = getAllTableMetadata();
4447  for (const auto table : tables) {
4448  eraseTableMetadata(table);
4449  }
4450  // Physically erase database metadata
4451  boost::filesystem::remove(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
4452  currentDB_.dbName);
4453  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4454 }
4455 
4456 void Catalog::eraseDbPhysicalData() {
4457  const auto tables = getAllTableMetadata();
4458  for (const auto table : tables) {
4459  eraseTablePhysicalData(table);
4460  }
4461 }
4462 
4463 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4464  const int tableId = td->tableId;
4465  // must destroy fragmenter before deleteChunks is called.
4466  removeFragmenterForTable(tableId);
4467 
4468  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4469  {
4470  INJECT_TIMER(deleteChunksWithPrefix);
4471  // assuming deleteChunksWithPrefix is atomic
4472  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4473  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4474  }
4475  if (!td->isView) {
4476  INJECT_TIMER(Remove_Table);
4477  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4478  }
4479 }
4480 
4481 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4482  const int32_t& shardNumber) {
4483  std::string physicalTableName =
4484  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4485  return (physicalTableName);
4486 }
4487 
4488 void Catalog::buildForeignServerMapUnlocked() {
4490  sqliteConnector_.query(
4491  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4492  "omnisci_foreign_servers");
4493  auto num_rows = sqliteConnector_.getNumRows();
4494 
4495  for (size_t row = 0; row < num_rows; row++) {
4496  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4497  sqliteConnector_.getData<int>(row, 0),
4498  sqliteConnector_.getData<std::string>(row, 1),
4499  sqliteConnector_.getData<std::string>(row, 2),
4500  sqliteConnector_.getData<std::string>(row, 3),
4501  sqliteConnector_.getData<std::int32_t>(row, 4),
4502  sqliteConnector_.getData<std::int32_t>(row, 5));
4503  foreignServerMap_[foreign_server->name] = foreign_server;
4504  foreignServerMapById_[foreign_server->id] = foreign_server;
4505  }
4506 }
4507 
4508 void Catalog::updateForeignTablesInMapUnlocked() {
4510  sqliteConnector_.query(
4511  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4512  "omnisci_foreign_tables");
4513  auto num_rows = sqliteConnector_.getNumRows();
4514  for (size_t r = 0; r < num_rows; r++) {
4515  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4516  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4517  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4518  const auto last_refresh_time = sqliteConnector_.getData<int>(r, 3);
4519  const auto next_refresh_time = sqliteConnector_.getData<int>(r, 4);
4520 
4521  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4522  auto foreign_table =
4523  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4524  CHECK(foreign_table);
4525  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4526  CHECK(foreign_table->foreign_server);
4527  foreign_table->populateOptionsMap(options);
4528  foreign_table->last_refresh_time = last_refresh_time;
4529  foreign_table->next_refresh_time = next_refresh_time;
4530  }
4531 }
4532 
4533 void Catalog::setForeignServerProperty(const std::string& server_name,
4534  const std::string& property,
4535  const std::string& value) {
4536  cat_sqlite_lock sqlite_lock(getObjForLock());
4537  sqliteConnector_.query_with_text_params(
4538  "SELECT id from omnisci_foreign_servers where name = ?",
4539  std::vector<std::string>{server_name});
4540  auto num_rows = sqliteConnector_.getNumRows();
4541  if (num_rows > 0) {
4542  CHECK_EQ(size_t(1), num_rows);
4543  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4544  sqliteConnector_.query_with_text_params(
4545  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4546  std::vector<std::string>{value, std::to_string(server_id)});
4547  } else {
4548  throw std::runtime_error{"Can not change property \"" + property +
4549  "\" for foreign server." + " Foreign server \"" +
4550  server_name + "\" is not found."};
4551  }
4552 }
4553 
4554 void Catalog::createDefaultServersIfNotExists() {
4559 
4560  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4561  "default_local_delimited",
4563  options,
4565  local_csv_server->validate();
4566  createForeignServerNoLocks(std::move(local_csv_server), true);
4567 
4568 #ifdef ENABLE_IMPORT_PARQUET
4569  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4570  "default_local_parquet",
4572  options,
4574  local_parquet_server->validate();
4575  createForeignServerNoLocks(std::move(local_parquet_server), true);
4576 #endif
4577 
4578  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
4579  "default_local_regex_parsed",
4581  options,
4583  local_regex_parser_server->validate();
4584  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
4585 }
4586 
4587 // prepare a fresh file reload on next table access
4588 void Catalog::setForReload(const int32_t tableId) {
4589  const auto td = getMetadataForTable(tableId);
4590  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4591  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4592  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4593  }
4594 }
4595 
4596 // get a table's data dirs
4597 std::vector<std::string> Catalog::getTableDataDirectories(
4598  const TableDescriptor* td) const {
4599  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4600  std::vector<std::string> file_paths;
4601  for (auto shard : getPhysicalTablesDescriptors(td)) {
4602  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4603  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4604  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4605  file_paths.push_back(file_path.filename().string());
4606  }
4607  return file_paths;
4608 }
4609 
4610 // get a column's dict dir basename
4611 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
4612  bool file_name_only) const {
4613  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4615  cd->columnType.get_comp_param() > 0) {
4616  const auto dictId = cd->columnType.get_comp_param();
4617  const DictRef dictRef(currentDB_.dbId, dictId);
4618  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4619  CHECK(dit != dictDescriptorMapByRef_.end());
4620  CHECK(dit->second);
4621  if (file_name_only) {
4622  boost::filesystem::path file_path(dit->second->dictFolderPath);
4623  return file_path.filename().string();
4624  } else {
4625  return dit->second->dictFolderPath;
4626  }
4627  }
4628  return std::string();
4629 }
4630 
4631 // get a table's dict dirs
4632 std::vector<std::string> Catalog::getTableDictDirectories(
4633  const TableDescriptor* td) const {
4634  std::vector<std::string> file_paths;
4635  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4636  auto file_base = getColumnDictDirectory(cd);
4637  if (!file_base.empty() &&
4638  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4639  file_paths.push_back(file_base);
4640  }
4641  }
4642  return file_paths;
4643 }
4644 
4645 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id) const {
4646  cat_read_lock read_lock(this);
4647  std::set<std::string> directory_paths;
4648  auto it = dict_columns_by_table_id_.find(table_id);
4649  if (it != dict_columns_by_table_id_.end()) {
4650  for (auto cd : it->second) {
4651  auto directory_path = getColumnDictDirectory(cd, false);
4652  if (!directory_path.empty()) {
4653  directory_paths.emplace(directory_path);
4654  }
4655  }
4656  }
4657  return directory_paths;
4658 }
4659 
4660 // returns table schema in a string
4661 // NOTE(sy): Might be able to replace dumpSchema() later with
4662 // dumpCreateTable() after a deeper review of the TableArchiver code.
4663 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
4664  CHECK(!td->is_system_table);
4665  cat_read_lock read_lock(this);
4666 
4667  std::ostringstream os;
4668  os << "CREATE TABLE @T (";
4669  // gather column defines
4670  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4671  std::string comma;
4672  std::vector<std::string> shared_dicts;
4673  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4674  for (const auto cd : cds) {
4675  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4676  const auto& ti = cd->columnType;
4677  os << comma << cd->columnName;
4678  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4679  if (ti.get_type() == SQLTypes::kCHAR) {
4680  os << " "
4681  << "TEXT";
4682  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4683  os << " "
4684  << "TEXT[]";
4685  } else {
4686  os << " " << ti.get_type_name();
4687  }
4688  os << (ti.get_notnull() ? " NOT NULL" : "");
4689  if (cd->default_value.has_value()) {
4690  os << " DEFAULT " << cd->getDefaultValueLiteral();
4691  }
4692  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4693  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4694  if (ti.get_compression() == kENCODING_DICT) {
4695  // if foreign reference, get referenced tab.col
4696  const auto dict_id = ti.get_comp_param();
4697  const DictRef dict_ref(currentDB_.dbId, dict_id);
4698  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4699  CHECK(dict_it != dictDescriptorMapByRef_.end());
4700  const auto dict_name = dict_it->second->dictName;
4701  // when migrating a table, any foreign dict ref will be dropped
4702  // and the first cd of a dict will become root of the dict
4703  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
4704  dict_root_cds[dict_name] = cd;
4705  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4706  } else {
4707  const auto dict_root_cd = dict_root_cds[dict_name];
4708  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
4709  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
4710  // "... shouldn't specify an encoding, it borrows from the referenced
4711  // column"
4712  }
4713  } else {
4714  os << " ENCODING NONE";
4715  }
4716  } else if (ti.is_date_in_days() ||
4717  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4718  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4719  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4720  } else if (ti.is_geometry()) {
4721  if (ti.get_compression() == kENCODING_GEOINT) {
4722  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4723  << ")";
4724  } else {
4725  os << " ENCODING NONE";
4726  }
4727  }
4728  comma = ", ";
4729  }
4730  }
4731  // gather SHARED DICTIONARYs
4732  if (shared_dicts.size()) {
4733  os << ", " << boost::algorithm::join(shared_dicts, ", ");
4734  }
4735  // gather WITH options ...
4736  std::vector<std::string> with_options;
4737  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4738  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4739  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4740  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4741  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4742  : "VACUUM='IMMEDIATE'");
4743  if (!td->partitions.empty()) {
4744  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4745  }
4746  if (td->nShards > 0) {
4747  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4748  CHECK(shard_cd);
4749  os << ", SHARD KEY(" << shard_cd->columnName << ")";
4750  with_options.push_back(
4751  "SHARD_COUNT=" +
4752  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4753  }
4754  if (td->sortedColumnId > 0) {
4755  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4756  CHECK(sort_cd);
4757  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4758  }
4760  td->maxRollbackEpochs != -1) {
4761  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4763  }
4764  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
4765  return os.str();
4766 }
4767 
4768 #include "Parser/ReservedKeywords.h"
4769 
4771 inline bool contains_spaces(std::string_view str) {
4772  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
4773  return std::isspace(ch);
4774  }) != str.end();
4775 }
4776 
4779  std::string_view str,
4780  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
4781  return str.find_first_of(chars) != std::string_view::npos;
4782 }
4783 
4785 inline bool is_reserved_sql_keyword(std::string_view str) {
4786  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
4787 }
4788 
4789 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
4790 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
4791  bool multiline_formatting,
4792  bool dump_defaults) const {
4793  cat_read_lock read_lock(this);
4794  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
4795 }
4796 
4797 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
4798  bool multiline_formatting,
4799  bool dump_defaults) const {
4800  cat_read_lock read_lock(this);
4801  const auto td = getMutableMetadataForTableUnlocked(table_id);
4802  if (!td) {
4803  return {};
4804  }
4805  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
4806 }
4807 
4808 std::string Catalog::dumpCreateTableUnlocked(const TableDescriptor* td,
4809  bool multiline_formatting,
4810  bool dump_defaults) const {
4811  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
4812  std::ostringstream os;
4813 
4814  if (foreign_table && !td->is_system_table) {
4815  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
4816  } else if (!td->isView) {
4817  os << "CREATE ";
4819  os << "TEMPORARY ";
4820  }
4821  os << "TABLE " + td->tableName + " (";
4822  } else {
4823  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
4824  return os.str();
4825  }
4826  // scan column defines
4827  std::vector<std::string> additional_info;
4828  std::set<std::string> shared_dict_column_names;
4829 
4830  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
4831 
4832  // gather column defines
4833  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4834  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4835  bool first = true;
4836  for (const auto cd : cds) {
4837  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4838  const auto& ti = cd->columnType;
4839  if (!first) {
4840  os << ",";
4841  if (!multiline_formatting) {
4842  os << " ";
4843  }
4844  } else {
4845  first = false;
4846  }
4847  if (multiline_formatting) {
4848  os << "\n ";
4849  }
4850  // column name
4851  os << quoteIfRequired(cd->columnName);
4852  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4853  if (ti.get_type() == SQLTypes::kCHAR) {
4854  os << " "
4855  << "TEXT";
4856  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4857  os << " "
4858  << "TEXT[]";
4859  } else {
4860  os << " " << ti.get_type_name();
4861  }
4862  os << (ti.get_notnull() ? " NOT NULL" : "");
4863  if (cd->default_value.has_value()) {
4864  os << " DEFAULT " << cd->getDefaultValueLiteral();
4865  }
4866  if (shared_dict_column_names.find(cd->columnName) ==
4867  shared_dict_column_names.end()) {
4868  // avoids "Column ... shouldn't specify an encoding, it borrows it
4869  // from the referenced column"
4870  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4871  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4872  if (ti.get_compression() == kENCODING_DICT) {
4873  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4874  } else {
4875  os << " ENCODING NONE";
4876  }
4877  } else if (ti.is_date_in_days() ||
4878  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4879  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4880  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4881  } else if (ti.is_geometry()) {
4882  if (ti.get_compression() == kENCODING_GEOINT) {
4883  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4884  << ")";
4885  } else {
4886  os << " ENCODING NONE";
4887  }
4888  }
4889  }
4890  }
4891  }
4892  // gather SHARED DICTIONARYs
4893  if (additional_info.size()) {
4894  std::string comma;
4895  if (!multiline_formatting) {
4896  comma = ", ";
4897  } else {
4898  comma = ",\n ";
4899  }
4900  os << comma;
4901  os << boost::algorithm::join(additional_info, comma);
4902  }
4903  os << ")";
4904 
4905  std::vector<std::string> with_options;
4906  if (foreign_table && !td->is_system_table) {
4907  if (multiline_formatting) {
4908  os << "\n";
4909  } else {
4910  os << " ";
4911  }
4912  os << "SERVER " << foreign_table->foreign_server->name;
4913 
4914  // gather WITH options ...
4915  for (const auto& [option, value] : foreign_table->options) {
4916  with_options.emplace_back(option + "='" + value + "'");
4917  }
4918  }
4919 
4920  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
4921  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4922  }
4923  if (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE) {
4924  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4925  }
4926  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
4927  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4928  }
4929  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
4930  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4931  }
4932  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
4933  td->maxRollbackEpochs != -1) {
4934  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4936  }
4937  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
4938  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4939  : "VACUUM='IMMEDIATE'");
4940  }
4941  if (!foreign_table && !td->partitions.empty()) {
4942  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4943  }
4944  if (!foreign_table && td->nShards > 0) {
4945  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4946  CHECK(shard_cd);
4947  with_options.push_back(
4948  "SHARD_COUNT=" +
4949  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4950  }
4951  if (!foreign_table && td->sortedColumnId > 0) {
4952  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4953  CHECK(sort_cd);
4954  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4955  }
4956 
4957  if (!with_options.empty()) {
4958  if (!multiline_formatting) {
4959  os << " ";
4960  } else {
4961  os << "\n";
4962  }
4963  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
4964  }
4965  os << ";";
4966  return os.str();
4967 }
4968 
4969 bool Catalog::validateNonExistentTableOrView(const std::string& name,
4970  const bool if_not_exists) {
4971  if (getMetadataForTable(name, false)) {
4972  if (if_not_exists) {
4973  return false;
4974  }
4975  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
4976  }
4977  return true;
4978 }
4979 
4980 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
4981  cat_read_lock read_lock(this);
4982  std::vector<const TableDescriptor*> tables;
4983  for (auto entry : tableDescriptorMapById_) {
4984  auto table_descriptor = entry.second;
4985  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
4986  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4987  CHECK(foreign_table);
4988  auto timing_type_entry = foreign_table->options.find(
4990  CHECK(timing_type_entry != foreign_table->options.end());
4992  if (timing_type_entry->second ==
4994  foreign_table->next_refresh_time <= current_time) {
4995  tables.emplace_back(foreign_table);
4996  }
4997  }
4998  }
4999  return tables;
5000 }
5001 
5002 void Catalog::updateForeignTableRefreshTimes(const int32_t table_id) {
5003  cat_write_lock write_lock(this);
5004  cat_sqlite_lock sqlite_lock(getObjForLock());
5005  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5006  auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
5007  CHECK(table_descriptor);
5008  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
5009  CHECK(foreign_table);
5010  auto last_refresh_time = foreign_storage::RefreshTimeCalculator::getCurrentTime();
5011  auto next_refresh_time = get_next_refresh_time(*foreign_table);
5012  sqliteConnector_.query_with_text_params(
5013  "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
5014  "WHERE table_id = ?",
5015  std::vector<std::string>{std::to_string(last_refresh_time),
5016  std::to_string(next_refresh_time),
5017  std::to_string(foreign_table->tableId)});
5018  foreign_table->last_refresh_time = last_refresh_time;
5019  foreign_table->next_refresh_time = next_refresh_time;
5020 }
5021 
5022 // TODO(Misiu): This function should be merged with setForeignServerOptions via
5023 // inheritance rather than replication similar functions.
5024 void Catalog::setForeignTableOptions(const std::string& table_name,
5025  foreign_storage::OptionsMap& options_map,
5026  bool clear_existing_options) {
5027  cat_write_lock write_lock(this);
5028  // update in-memory table
5029  auto foreign_table = getForeignTableUnlocked(table_name);
5030  auto saved_options = foreign_table->options;
5031  foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
5032  try {
5033  foreign_table->validateOptionValues();
5034  } catch (const std::exception& e) {
5035  // validation did not succeed:
5036  // revert to saved options & throw exception
5037  foreign_table->options = saved_options;
5038  throw;
5039  }
5040  setForeignTableProperty(
5041  foreign_table, "options", foreign_table->getOptionsAsJsonString());
5042 }
5043 
5044 void Catalog::setForeignTableProperty(const foreign_storage::ForeignTable* table,
5045  const std::string& property,
5046  const std::string& value) {
5047  cat_sqlite_lock sqlite_lock(getObjForLock());
5048  sqliteConnector_.query_with_text_params(
5049  "SELECT table_id from omnisci_foreign_tables where table_id = ?",
5050  std::vector<std::string>{std::to_string(table->tableId)});
5051  auto num_rows = sqliteConnector_.getNumRows();
5052  if (num_rows > 0) {
5053  CHECK_EQ(size_t(1), num_rows);
5054  sqliteConnector_.query_with_text_params(
5055  "UPDATE omnisci_foreign_tables SET " + property + " = ? WHERE table_id = ?",
5056  std::vector<std::string>{value, std::to_string(table->tableId)});
5057  } else {
5058  throw std::runtime_error{"Can not change property \"" + property +
5059  "\" for foreign table." + " Foreign table \"" +
5060  table->tableName + "\" is not found."};
5061  }
5062 }
5063 
5064 std::string Catalog::quoteIfRequired(const std::string& column_name) const {
5065  if (is_reserved_sql_keyword(column_name) || contains_spaces(column_name) ||
5066  contains_sql_reserved_chars(column_name)) {
5067  return get_quoted_string(column_name, '"', '"');
5068  } else {
5069  return column_name;
5070  }
5071 }
5072 
5073 // this will gather information that represents the shared dictionary columns
5074 // as they are on the table NOW not at original creation
5075 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
5076  std::set<std::string>& shared_dict_column_names,
5077  const TableDescriptor* td) const {
5078  if (td->nShards > 0) {
5079  ColumnIdKey columnIdKey(td->tableId, td->shardedColumnId);
5080  auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
5081  CHECK(scd);
5082  std::string txt = "SHARD KEY (" + quoteIfRequired(scd->columnName) + ")";
5083  additional_info.emplace_back(txt);
5084  }
5085  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5086  for (const auto cd : cds) {
5087  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5088  const SQLTypeInfo& ti = cd->columnType;
5089  if (ti.get_compression() != kENCODING_DICT) {
5090  continue;
5091  }
5092  auto dictId = ti.get_comp_param();
5093 
5094  // now we need to check how many other users have this dictionary
5095 
5096  DictRef dict_ref(currentDB_.dbId, dictId);
5097  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
5098  if (dictIt == dictDescriptorMapByRef_.end()) {
5099  LOG(ERROR) << "missing dictionary " << dictId << " for table " << td->tableName;
5100  continue;
5101  }
5102 
5103  const auto& dd = dictIt->second;
5104  if (dd->refcount > 1) {
5105  auto lowest_table = td->tableId;
5106  auto lowest_column = cd->columnId;
5107  std::string lowest_column_name;
5108  // we have multiple tables using this dictionary
5109  // find the other occurances and keep the "lowest"
5110  for (auto const& [key, val] : columnDescriptorMap_) {
5111  if (val->columnType.get_compression() == kENCODING_DICT &&
5112  val->columnType.get_comp_param() == dictId &&
5113  !(val->tableId == td->tableId && val->columnId == cd->columnId)) {
5114  if (val->tableId < lowest_table) {
5115  lowest_table = val->tableId;
5116  lowest_column = val->columnId;
5117  lowest_column_name = val->columnName;
5118  }
5119  if (val->columnId < lowest_column) {
5120  lowest_column = val->columnId;
5121  lowest_column_name = val->columnName;
5122  }
5123  }
5124  }
5125  if (lowest_table != td->tableId || lowest_column != cd->columnId) {
5126  // we are referencing a different tables dictionary
5127  auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
5128  CHECK(lowest_td);
5129  std::string txt = "SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
5130  ") REFERENCES " + lowest_td->tableName + "(" +
5131  quoteIfRequired(lowest_column_name) + ")";
5132 
5133  additional_info.emplace_back(txt);
5134  shared_dict_column_names.insert(cd->columnName);
5135  }
5136  }
5137  }
5138  }
5139 }
5140