OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Catalog/Catalog.h"
26 
27 #include <algorithm>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/range/adaptor/map.hpp>
31 #include <boost/version.hpp>
32 #include <cassert>
33 #include <cerrno>
34 #include <cstdio>
35 #include <cstring>
36 #include <exception>
37 #include <fstream>
38 #include <list>
39 #include <memory>
40 #include <random>
41 #include <regex>
42 #include <sstream>
43 
44 #if BOOST_VERSION >= 106600
45 #include <boost/uuid/detail/sha1.hpp>
46 #else
47 #include <boost/uuid/sha1.hpp>
48 #endif
49 #include <rapidjson/document.h>
50 #include <rapidjson/istreamwrapper.h>
51 #include <rapidjson/ostreamwrapper.h>
52 #include <rapidjson/writer.h>
53 
54 #include "Catalog/SysCatalog.h"
55 
56 #include "QueryEngine/Execute.h"
58 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
67 #include "Parser/ParserNode.h"
68 #include "QueryEngine/Execute.h"
70 #include "RefreshTimeCalculator.h"
71 #include "Shared/DateTimeParser.h"
72 #include "Shared/File.h"
73 #include "Shared/StringTransform.h"
74 #include "Shared/measure.h"
75 #include "Shared/misc.h"
77 
78 #include "MapDRelease.h"
79 #include "RWLocks.h"
81 
82 using Chunk_NS::Chunk;
85 using std::list;
86 using std::map;
87 using std::pair;
88 using std::runtime_error;
89 using std::string;
90 using std::vector;
91 
93 bool g_enable_fsi{false};
94 bool g_enable_s3_fsi{false};
95 extern bool g_cache_string_hash;
96 extern bool g_enable_system_tables;
97 
98 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
99 // under unit testing.
101 
102 namespace Catalog_Namespace {
103 
104 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
106  1073741824; // 2^30, give room for over a billion non-temp tables
108  1073741824; // 2^30, give room for over a billion non-temp dictionaries
109 
110 const std::string Catalog::physicalTableNameTag_("_shard_#");
111 
112 thread_local bool Catalog::thread_holds_read_lock = false;
113 
118 
119 // migration will be done as two step process this release
120 // will create and use new table
121 // next release will remove old table, doing this to have fall back path
122 // incase of migration failure
125  sqliteConnector_.query("BEGIN TRANSACTION");
126  try {
128  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
129  if (sqliteConnector_.getNumRows() != 0) {
130  // already done
131  sqliteConnector_.query("END TRANSACTION");
132  return;
133  }
135  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
136  "userid integer references mapd_users, state text, image_hash text, update_time "
137  "timestamp, "
138  "metadata text, UNIQUE(userid, name) )");
139  // now copy content from old table to new table
141  "insert into mapd_dashboards (id, name , "
142  "userid, state, image_hash, update_time , "
143  "metadata) "
144  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
145  "view_metadata "
146  "from mapd_frontend_views");
147  } catch (const std::exception& e) {
148  sqliteConnector_.query("ROLLBACK TRANSACTION");
149  throw;
150  }
151  sqliteConnector_.query("END TRANSACTION");
152 }
153 
154 namespace {
155 
156 inline auto table_json_filepath(const std::string& base_path,
157  const std::string& db_name) {
158  return boost::filesystem::path(base_path + "/mapd_catalogs/" + db_name +
159  "_temp_tables.json");
160 }
161 
162 } // namespace
163 
164 Catalog::Catalog(const string& basePath,
165  const DBMetadata& curDB,
166  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
167  const std::vector<LeafHostInfo>& string_dict_hosts,
168  std::shared_ptr<Calcite> calcite,
169  bool is_new_db)
170  : basePath_(basePath)
171  , sqliteConnector_(curDB.dbName, basePath + "/mapd_catalogs/")
172  , currentDB_(curDB)
173  , dataMgr_(dataMgr)
174  , string_dict_hosts_(string_dict_hosts)
175  , calciteMgr_(calcite)
176  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
177  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
178  , sqliteMutex_()
179  , sharedMutex_()
180  , thread_holding_sqlite_lock()
181  , thread_holding_write_lock() {
182  if (!is_new_db) {
183  CheckAndExecuteMigrations();
184  }
185  buildMaps();
186  if (!is_new_db) {
187  CheckAndExecuteMigrationsPostBuildMaps();
188  }
190  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
191  }
192  conditionallyInitializeSystemTables();
193  // once all initialized use real object
194  initialized_ = true;
195 }
196 
198 
201  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
202  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
203  tableDescIt != tableDescriptorMap_.end();
204  ++tableDescIt) {
205  tableDescIt->second->fragmenter = nullptr;
206  delete tableDescIt->second;
207  }
208 
209  // TableDescriptorMapById points to the same descriptors. No need to delete
210 
211  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
212  columnDescIt != columnDescriptorMap_.end();
213  ++columnDescIt) {
214  delete columnDescIt->second;
215  }
216 
217  // ColumnDescriptorMapById points to the same descriptors. No need to delete
218 
220  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
221  }
222 }
223 
225  if (initialized_) {
226  return this;
227  } else {
228  return SysCatalog::instance().getDummyCatalog().get();
229  }
230 }
231 
234  sqliteConnector_.query("BEGIN TRANSACTION");
235  try {
236  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
237  std::vector<std::string> cols;
238  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
239  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
240  }
241  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
242  cols.end()) {
243  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
245  sqliteConnector_.query(queryString);
246  }
247  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
248  cols.end()) {
249  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
250  std::to_string(0));
251  sqliteConnector_.query(queryString);
252  }
253  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
254  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
255  std::to_string(-1));
256  sqliteConnector_.query(queryString);
257  }
258  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
259  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
260  std::to_string(0));
261  sqliteConnector_.query(queryString);
262  }
263  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
264  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
265  sqliteConnector_.query(queryString);
266  }
267  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
268  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
270  sqliteConnector_.query(queryString);
271  }
272  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
273  cols.end()) {
275  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
276  }
277  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
278  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
279  sqliteConnector_.query(queryString);
280  }
281  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
282  cols.end()) {
283  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
284  std::to_string(-1));
285  sqliteConnector_.query(queryString);
286  }
287  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
288  cols.end()) {
289  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
290  sqliteConnector_.query(queryString);
291  }
292  } catch (std::exception& e) {
293  sqliteConnector_.query("ROLLBACK TRANSACTION");
294  throw;
295  }
296  sqliteConnector_.query("END TRANSACTION");
297 }
298 
301  sqliteConnector_.query("BEGIN TRANSACTION");
302  try {
304  "select name from sqlite_master WHERE type='table' AND "
305  "name='mapd_version_history'");
306  if (sqliteConnector_.getNumRows() == 0) {
308  "CREATE TABLE mapd_version_history(version integer, migration_history text "
309  "unique)");
310  } else {
312  "select * from mapd_version_history where migration_history = "
313  "'notnull_fixlen_arrays'");
314  if (sqliteConnector_.getNumRows() != 0) {
315  // legacy fixlen arrays had migrated
316  // no need for further execution
317  sqliteConnector_.query("END TRANSACTION");
318  return;
319  }
320  }
321  // Insert check for migration
323  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
324  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
325  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
326  // Upating all fixlen array columns
327  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
328  std::to_string(kARRAY) + " AND size>0;");
329  sqliteConnector_.query(queryString);
330  } catch (std::exception& e) {
331  sqliteConnector_.query("ROLLBACK TRANSACTION");
332  throw;
333  }
334  sqliteConnector_.query("END TRANSACTION");
335 }
336 
339  sqliteConnector_.query("BEGIN TRANSACTION");
340  try {
342  "select name from sqlite_master WHERE type='table' AND "
343  "name='mapd_version_history'");
344  if (sqliteConnector_.getNumRows() == 0) {
346  "CREATE TABLE mapd_version_history(version integer, migration_history text "
347  "unique)");
348  } else {
350  "select * from mapd_version_history where migration_history = "
351  "'notnull_geo_columns'");
352  if (sqliteConnector_.getNumRows() != 0) {
353  // legacy geo columns had migrated
354  // no need for further execution
355  sqliteConnector_.query("END TRANSACTION");
356  return;
357  }
358  }
359  // Insert check for migration
361  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
362  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
363  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
364  // Upating all geo columns
365  string queryString(
366  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
367  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
368  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
369  sqliteConnector_.query(queryString);
370  } catch (std::exception& e) {
371  sqliteConnector_.query("ROLLBACK TRANSACTION");
372  throw;
373  }
374  sqliteConnector_.query("END TRANSACTION");
375 }
376 
379  sqliteConnector_.query("BEGIN TRANSACTION");
380  try {
381  // check table still exists
383  "SELECT name FROM sqlite_master WHERE type='table' AND "
384  "name='mapd_frontend_views'");
385  if (sqliteConnector_.getNumRows() == 0) {
386  // table does not exists
387  // no need to migrate
388  sqliteConnector_.query("END TRANSACTION");
389  return;
390  }
391  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
392  std::vector<std::string> cols;
393  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
394  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
395  }
396  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
397  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
398  }
399  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
400  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
401  }
402  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
403  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
404  }
405  } catch (std::exception& e) {
406  sqliteConnector_.query("ROLLBACK TRANSACTION");
407  throw;
408  }
409  sqliteConnector_.query("END TRANSACTION");
410 }
411 
414  sqliteConnector_.query("BEGIN TRANSACTION");
415  try {
417  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
418  "integer references mapd_users, "
419  "link text unique, view_state text, update_time timestamp, view_metadata text)");
420  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
421  std::vector<std::string> cols;
422  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
423  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
424  }
425  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
426  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
427  }
428  } catch (const std::exception& e) {
429  sqliteConnector_.query("ROLLBACK TRANSACTION");
430  throw;
431  }
432  sqliteConnector_.query("END TRANSACTION");
433 }
434 
437  sqliteConnector_.query("BEGIN TRANSACTION");
438  try {
439  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
440  // check table still exists
442  "SELECT name FROM sqlite_master WHERE type='table' AND "
443  "name='mapd_frontend_views'");
444  if (sqliteConnector_.getNumRows() == 0) {
445  // table does not exists
446  // no need to migrate
447  sqliteConnector_.query("END TRANSACTION");
448  return;
449  }
451  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
452  } catch (const std::exception& e) {
453  sqliteConnector_.query("ROLLBACK TRANSACTION");
454  throw;
455  }
456  sqliteConnector_.query("END TRANSACTION");
457 }
458 
459 // introduce DB version into the tables table
460 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
461 // old value
462 
465  if (currentDB_.dbName.length() == 0) {
466  // updateDictionaryNames dbName length is zero nothing to do here
467  return;
468  }
469  sqliteConnector_.query("BEGIN TRANSACTION");
470  try {
471  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
472  std::vector<std::string> cols;
473  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
474  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
475  }
476  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
477  LOG(INFO) << "Updating mapd_tables updatePageSize";
478  // No version number
479  // need to update the defaul tpagesize to old correct value
480  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
481  // need to add new version info
482  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
484  sqliteConnector_.query(queryString);
485  }
486  } catch (std::exception& e) {
487  sqliteConnector_.query("ROLLBACK TRANSACTION");
488  throw;
489  }
490  sqliteConnector_.query("END TRANSACTION");
491 }
492 
495  sqliteConnector_.query("BEGIN TRANSACTION");
496  try {
497  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
498  std::vector<std::string> cols;
499  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
500  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
501  }
502  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
503  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
504  // need to add new version info
505  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
507  sqliteConnector_.query(queryString);
508  // need to add new column to table defintion to indicate deleted column, column used
509  // as bitmap for deleted rows.
511  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
512  }
513  } catch (std::exception& e) {
514  sqliteConnector_.query("ROLLBACK TRANSACTION");
515  throw;
516  }
517  sqliteConnector_.query("END TRANSACTION");
518 }
519 
522  sqliteConnector_.query("BEGIN TRANSACTION");
523  try {
524  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
525  std::vector<std::string> cols;
526  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
527  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
528  }
529  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
530  LOG(INFO) << "Adding support for default values to mapd_columns";
531  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
532  }
533  } catch (std::exception& e) {
534  LOG(ERROR) << "Failed to make metadata update for default values` support";
535  sqliteConnector_.query("ROLLBACK TRANSACTION");
536  throw;
537  }
538  sqliteConnector_.query("END TRANSACTION");
539 }
540 
541 // introduce DB version into the dictionary tables
542 // if the DB does not have a version rename all dictionary tables
543 
546  if (currentDB_.dbName.length() == 0) {
547  // updateDictionaryNames dbName length is zero nothing to do here
548  return;
549  }
550  sqliteConnector_.query("BEGIN TRANSACTION");
551  try {
552  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
553  std::vector<std::string> cols;
554  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
555  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
556  }
557  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
558  // No version number
559  // need to rename dictionaries
560  string dictQuery("SELECT dictid, name from mapd_dictionaries");
561  sqliteConnector_.query(dictQuery);
562  size_t numRows = sqliteConnector_.getNumRows();
563  for (size_t r = 0; r < numRows; ++r) {
564  int dictId = sqliteConnector_.getData<int>(r, 0);
565  std::string dictName = sqliteConnector_.getData<string>(r, 1);
566 
567  std::string oldName =
568  g_base_path + "/mapd_data/" + currentDB_.dbName + "_" + dictName;
569  std::string newName = g_base_path + "/mapd_data/DB_" +
570  std::to_string(currentDB_.dbId) + "_DICT_" +
571  std::to_string(dictId);
572 
573  int result = rename(oldName.c_str(), newName.c_str());
574 
575  if (result == 0) {
576  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
577  << newName;
578  } else {
579  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
580  << newName + " dbname '" << currentDB_.dbName << "' error code "
581  << std::to_string(result);
582  }
583  }
584  // need to add new version info
585  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
587  sqliteConnector_.query(queryString);
588  }
589  } catch (std::exception& e) {
590  sqliteConnector_.query("ROLLBACK TRANSACTION");
591  throw;
592  }
593  sqliteConnector_.query("END TRANSACTION");
594 }
595 
598  sqliteConnector_.query("BEGIN TRANSACTION");
599  try {
601  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
602  "logical_table_id integer, physical_table_id integer)");
603  } catch (const std::exception& e) {
604  sqliteConnector_.query("ROLLBACK TRANSACTION");
605  throw;
606  }
607  sqliteConnector_.query("END TRANSACTION");
608 }
609 
610 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
611  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
612  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
613  */
614 
616  sqliteConnector_.query("BEGIN TRANSACTION");
617  try {
618  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
619  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
620  const auto physicalTables = physicalTableIt->second;
621  CHECK(!physicalTables.empty());
622  for (size_t i = 0; i < physicalTables.size(); i++) {
623  int32_t physical_tb_id = physicalTables[i];
625  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
626  "physical_table_id) VALUES (?1, ?2)",
627  std::vector<std::string>{std::to_string(logical_tb_id),
628  std::to_string(physical_tb_id)});
629  }
630  }
631  } catch (std::exception& e) {
632  sqliteConnector_.query("ROLLBACK TRANSACTION");
633  throw;
634  }
635  sqliteConnector_.query("END TRANSACTION");
636 }
637 
640  sqliteConnector_.query("BEGIN TRANSACTION");
641  try {
642  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
643  std::vector<std::string> cols;
644  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
645  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
646  }
647  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
648  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
649  }
650  } catch (std::exception& e) {
651  sqliteConnector_.query("ROLLBACK TRANSACTION");
652  throw;
653  }
654  sqliteConnector_.query("END TRANSACTION");
655 }
656 
659  sqliteConnector_.query("BEGIN TRANSACTION");
660  try {
663  } catch (std::exception& e) {
664  sqliteConnector_.query("ROLLBACK TRANSACTION");
665  throw;
666  }
667  sqliteConnector_.query("END TRANSACTION");
668 }
669 
672  sqliteConnector_.query("BEGIN TRANSACTION");
673  try {
675  } catch (const std::exception& e) {
676  sqliteConnector_.query("ROLLBACK TRANSACTION");
677  throw;
678  }
679  sqliteConnector_.query("END TRANSACTION");
680 }
681 
682 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
683  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
684  "omnisci_foreign_servers(id integer primary key, name text unique, " +
685  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
686  "options text)";
687 }
688 
689 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
690  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
691  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
692  "options text, last_refresh_time integer, next_refresh_time integer, " +
693  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
694  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
695 }
696 
697 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
698  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
699  "omnisci_custom_expressions(id integer primary key, name text, " +
700  "expression_json text, data_source_type text, " +
701  "data_source_id integer, is_deleted boolean)";
702 }
703 
706  sqliteConnector_.query("BEGIN TRANSACTION");
707  std::vector<DBObject> objects;
708  try {
710  "SELECT name FROM sqlite_master WHERE type='table' AND "
711  "name='mapd_record_ownership_marker'");
712  // check if mapd catalog - marker exists
713  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
714  // already done
715  sqliteConnector_.query("END TRANSACTION");
716  return;
717  }
718  // check if different catalog - marker exists
719  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
720  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
721  // Check if migration is being performed on existing non mapd catalogs
722  // Older non mapd dbs will have table but no record in them
723  if (sqliteConnector_.getNumRows() != 0) {
724  // already done
725  sqliteConnector_.query("END TRANSACTION");
726  return;
727  }
728  }
729  // marker not exists - create one
730  else {
731  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
732  }
733 
734  DBMetadata db;
735  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
736  // place dbId as a refernce for migration being performed
738  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
739  std::vector<std::string>{std::to_string(db.dbOwner)});
740 
741  static const std::map<const DBObjectType, const AccessPrivileges>
742  object_level_all_privs_lookup{
748 
749  // grant owner all permissions on DB
750  DBObjectKey key;
751  key.dbId = currentDB_.dbId;
752  auto _key_place = [&key](auto type) {
753  key.permissionType = type;
754  return key;
755  };
756  for (auto& it : object_level_all_privs_lookup) {
757  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
758  objects.back().setName(currentDB_.dbName);
759  }
760 
761  {
762  // other users tables and views
763  string tableQuery(
764  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
765  sqliteConnector_.query(tableQuery);
766  size_t numRows = sqliteConnector_.getNumRows();
767  for (size_t r = 0; r < numRows; ++r) {
768  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
769  std::string tableName = sqliteConnector_.getData<string>(r, 1);
770  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
771  bool isview = sqliteConnector_.getData<bool>(r, 3);
772 
775  DBObjectKey key;
776  key.dbId = currentDB_.dbId;
777  key.objectId = tableid;
778  key.permissionType = type;
779 
780  DBObject obj(tableName, type);
781  obj.setObjectKey(key);
782  obj.setOwner(ownerid);
785 
786  objects.push_back(obj);
787  }
788  }
789 
790  {
791  // other users dashboards
792  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
793  sqliteConnector_.query(tableQuery);
794  size_t numRows = sqliteConnector_.getNumRows();
795  for (size_t r = 0; r < numRows; ++r) {
796  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
797  std::string dashName = sqliteConnector_.getData<string>(r, 1);
798  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
799 
801  DBObjectKey key;
802  key.dbId = currentDB_.dbId;
803  key.objectId = dashId;
804  key.permissionType = type;
805 
806  DBObject obj(dashName, type);
807  obj.setObjectKey(key);
808  obj.setOwner(ownerid);
810 
811  objects.push_back(obj);
812  }
813  }
814  } catch (const std::exception& e) {
815  sqliteConnector_.query("ROLLBACK TRANSACTION");
816  throw;
817  }
818  sqliteConnector_.query("END TRANSACTION");
819 
820  // now apply the objects to the syscat to track the permisisons
821  // moved outside transaction to avoid lock in sqlite
822  try {
824  } catch (const std::exception& e) {
825  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
826  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
827  e.what());
828  // will need to remove the mapd_record_ownership_marker table and retry
829  }
830 }
831 
836 }
837 
839  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
840  std::vector<std::string> dashboard_ids;
841  static const std::string migration_name{"dashboard_roles_migration"};
842  {
844  sqliteConnector_.query("BEGIN TRANSACTION");
845  try {
846  // migration_history should be present in all catalogs by now
847  // if not then would be created before this migration
849  "select * from mapd_version_history where migration_history = '" +
850  migration_name + "'");
851  if (sqliteConnector_.getNumRows() != 0) {
852  // no need for further execution
853  sqliteConnector_.query("END TRANSACTION");
854  return;
855  }
856  LOG(INFO) << "Performing dashboard internal roles Migration.";
857  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
858  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
861  sqliteConnector_.getData<string>(i, 0)))) {
862  // Successfully created roles during previous migration/crash
863  // No need to include them
864  continue;
865  }
866  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
867  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
868  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
869  }
870  } catch (const std::exception& e) {
871  sqliteConnector_.query("ROLLBACK TRANSACTION");
872  throw;
873  }
874  sqliteConnector_.query("END TRANSACTION");
875  }
876  // All current grantees with shared dashboards.
877  const auto active_grantees =
879 
880  try {
881  // NOTE(wamsi): Transactionally unsafe
882  for (auto dash : dashboards) {
883  createOrUpdateDashboardSystemRole(dash.second.second,
884  dash.second.first,
886  std::to_string(currentDB_.dbId), dash.first));
887  auto result = active_grantees.find(dash.first);
888  if (result != active_grantees.end()) {
891  dash.first)},
892  result->second);
893  }
894  }
896  // check if this has already been completed
898  "select * from mapd_version_history where migration_history = '" +
899  migration_name + "'");
900  if (sqliteConnector_.getNumRows() != 0) {
901  return;
902  }
904  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
905  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
906  } catch (const std::exception& e) {
907  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
908  << e.what();
909  throw;
910  }
911  LOG(INFO) << "Successfully created dashboard system roles during migration.";
912 }
913 
924  updatePageSize();
928  if (g_enable_fsi) {
930  }
933 }
934 
938 }
939 
940 namespace {
941 std::string getUserFromId(const int32_t id) {
942  UserMetadata user;
943  if (SysCatalog::instance().getMetadataForUserById(id, user)) {
944  return user.userName;
945  }
946  // a user could be deleted and a dashboard still exist?
947  return "Unknown";
948 }
949 } // namespace
950 
954 
955  string dictQuery(
956  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
957  sqliteConnector_.query(dictQuery);
958  size_t numRows = sqliteConnector_.getNumRows();
959  for (size_t r = 0; r < numRows; ++r) {
960  int dictId = sqliteConnector_.getData<int>(r, 0);
961  std::string dictName = sqliteConnector_.getData<string>(r, 1);
962  int dictNBits = sqliteConnector_.getData<int>(r, 2);
963  bool is_shared = sqliteConnector_.getData<bool>(r, 3);
964  int refcount = sqliteConnector_.getData<int>(r, 4);
965  std::string fname = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
966  "_DICT_" + std::to_string(dictId);
967  DictRef dict_ref(currentDB_.dbId, dictId);
968  DictDescriptor* dd = new DictDescriptor(
969  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
970  dictDescriptorMapByRef_[dict_ref].reset(dd);
971  }
972 
973  string tableQuery(
974  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
975  "max_chunk_size, frag_page_size, "
976  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
977  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
978  "from mapd_tables");
979  sqliteConnector_.query(tableQuery);
980  numRows = sqliteConnector_.getNumRows();
981  for (size_t r = 0; r < numRows; ++r) {
982  TableDescriptor* td;
983  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
984  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
985  const auto table_id = sqliteConnector_.getData<int>(r, 0);
986  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
987  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
988  "supported table option (table "
989  << table_name << " [" << table_id << "] in database "
990  << currentDB_.dbName << ").";
991  }
992 
993  if (storage_type == StorageType::FOREIGN_TABLE) {
995  } else {
996  td = new TableDescriptor();
997  }
998 
999  td->storageType = storage_type;
1000  td->tableId = sqliteConnector_.getData<int>(r, 0);
1001  td->tableName = sqliteConnector_.getData<string>(r, 1);
1002  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1003  td->isView = sqliteConnector_.getData<bool>(r, 3);
1004  td->fragments = sqliteConnector_.getData<string>(r, 4);
1005  td->fragType =
1007  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1008  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1009  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1010  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1011  td->partitions = sqliteConnector_.getData<string>(r, 10);
1012  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1013  td->shard = sqliteConnector_.getData<int>(r, 12);
1014  td->nShards = sqliteConnector_.getData<int>(r, 13);
1015  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1016  td->userId = sqliteConnector_.getData<int>(r, 15);
1017  td->sortedColumnId =
1018  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1019  if (!td->isView) {
1020  td->fragmenter = nullptr;
1021  }
1022  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1023  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1024  td->hasDeletedCol = false;
1025 
1027  tableDescriptorMapById_[td->tableId] = td;
1028  }
1029 
1030  if (g_enable_fsi) {
1034  }
1035 
1036  string columnQuery(
1037  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1038  "is_notnull, compression, comp_param, "
1039  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1040  "default_value from "
1041  "mapd_columns ORDER BY tableid, "
1042  "columnid");
1043  sqliteConnector_.query(columnQuery);
1044  numRows = sqliteConnector_.getNumRows();
1045  int32_t skip_physical_cols = 0;
1046  for (size_t r = 0; r < numRows; ++r) {
1047  ColumnDescriptor* cd = new ColumnDescriptor();
1048  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1049  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1050  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1054  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1058  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1059  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1060  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1061  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1062  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1063  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1064  if (sqliteConnector_.isNull(r, 16)) {
1065  cd->default_value = std::nullopt;
1066  } else {
1067  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1068  }
1069  cd->isGeoPhyCol = skip_physical_cols > 0;
1070  ColumnKey columnKey(cd->tableId, to_upper(cd->columnName));
1071  columnDescriptorMap_[columnKey] = cd;
1072  ColumnIdKey columnIdKey(cd->tableId, cd->columnId);
1073  columnDescriptorMapById_[columnIdKey] = cd;
1074 
1075  if (skip_physical_cols <= 0) {
1076  skip_physical_cols = cd->columnType.get_physical_cols();
1077  }
1078 
1079  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1080  CHECK(td_itr != tableDescriptorMapById_.end());
1081 
1082  if (cd->isDeletedCol) {
1083  td_itr->second->hasDeletedCol = true;
1084  setDeletedColumnUnlocked(td_itr->second, cd);
1085  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1086  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1087  }
1088  }
1089  // sort columnIdBySpi_ based on columnId
1090  for (auto& tit : tableDescriptorMapById_) {
1091  std::sort(tit.second->columnIdBySpi_.begin(),
1092  tit.second->columnIdBySpi_.end(),
1093  [](const size_t a, const size_t b) -> bool { return a < b; });
1094  }
1095 
1096  string viewQuery("SELECT tableid, sql FROM mapd_views");
1097  sqliteConnector_.query(viewQuery);
1098  numRows = sqliteConnector_.getNumRows();
1099  for (size_t r = 0; r < numRows; ++r) {
1100  int32_t tableId = sqliteConnector_.getData<int>(r, 0);
1101  TableDescriptor* td = tableDescriptorMapById_[tableId];
1102  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1103  td->fragmenter = nullptr;
1104  }
1105 
1106  string frontendViewQuery(
1107  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1108  "userid, "
1109  "metadata "
1110  "FROM mapd_dashboards");
1111  sqliteConnector_.query(frontendViewQuery);
1112  numRows = sqliteConnector_.getNumRows();
1113  for (size_t r = 0; r < numRows; ++r) {
1114  std::shared_ptr<DashboardDescriptor> vd = std::make_shared<DashboardDescriptor>();
1115  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1116  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1117  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1118  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1119  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1120  vd->userId = sqliteConnector_.getData<int>(r, 5);
1121  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1122  vd->user = getUserFromId(vd->userId);
1123  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1125  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1126  }
1127 
1128  string linkQuery(
1129  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1130  "update_time), view_metadata "
1131  "FROM mapd_links");
1132  sqliteConnector_.query(linkQuery);
1133  numRows = sqliteConnector_.getNumRows();
1134  for (size_t r = 0; r < numRows; ++r) {
1135  LinkDescriptor* ld = new LinkDescriptor();
1136  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1137  ld->userId = sqliteConnector_.getData<int>(r, 1);
1138  ld->link = sqliteConnector_.getData<string>(r, 2);
1139  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1140  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1141  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1143  linkDescriptorMapById_[ld->linkId] = ld;
1144  }
1145 
1146  /* rebuild map linking logical tables to corresponding physical ones */
1147  string logicalToPhysicalTableMapQuery(
1148  "SELECT logical_table_id, physical_table_id "
1149  "FROM mapd_logical_to_physical");
1150  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1151  numRows = sqliteConnector_.getNumRows();
1152  for (size_t r = 0; r < numRows; ++r) {
1153  int32_t logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1154  int32_t physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1155  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1156  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1157  /* add new entity to the map logicalToPhysicalTableMapById_ */
1158  std::vector<int32_t> physicalTables;
1159  physicalTables.push_back(physical_tb_id);
1160  const auto it_ok =
1161  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1162  CHECK(it_ok.second);
1163  } else {
1164  /* update map logicalToPhysicalTableMapById_ */
1165  physicalTableIt->second.push_back(physical_tb_id);
1166  }
1167  }
1168 
1170 }
1171 
1174  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1175  "is_deleted "
1176  "FROM omnisci_custom_expressions");
1177  auto num_rows = sqliteConnector_.getNumRows();
1178  for (size_t row = 0; row < num_rows; row++) {
1179  auto custom_expr = getCustomExpressionFromConnector(row);
1180  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1181  }
1182 }
1183 
1184 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1185  auto id = sqliteConnector_.getData<int>(row, 0);
1186  auto name = sqliteConnector_.getData<string>(row, 1);
1187  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1188  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1189  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1190  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1191  return std::make_unique<CustomExpression>(
1192  id,
1193  name,
1194  expression_json,
1195  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1196  data_source_id,
1197  is_deleted);
1198 }
1199 
1201  const list<ColumnDescriptor>& columns,
1202  const list<DictDescriptor>& dicts) {
1203  cat_write_lock write_lock(this);
1204  TableDescriptor* new_td;
1205 
1206  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1207  if (foreign_table) {
1208  auto new_foreign_table = new foreign_storage::ForeignTable();
1209  *new_foreign_table = *foreign_table;
1210  new_td = new_foreign_table;
1211  } else {
1212  new_td = new TableDescriptor();
1213  *new_td = *td;
1214  }
1215 
1216  new_td->mutex_ = std::make_shared<std::mutex>();
1217  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1218  tableDescriptorMapById_[td->tableId] = new_td;
1219  for (auto cd : columns) {
1220  ColumnDescriptor* new_cd = new ColumnDescriptor();
1221  *new_cd = cd;
1222  ColumnKey columnKey(new_cd->tableId, to_upper(new_cd->columnName));
1223  columnDescriptorMap_[columnKey] = new_cd;
1224  ColumnIdKey columnIdKey(new_cd->tableId, new_cd->columnId);
1225  columnDescriptorMapById_[columnIdKey] = new_cd;
1226 
1227  // Add deleted column to the map
1228  if (cd.isDeletedCol) {
1229  CHECK(new_td->hasDeletedCol);
1230  setDeletedColumnUnlocked(new_td, new_cd);
1231  }
1232  }
1233 
1234  std::sort(new_td->columnIdBySpi_.begin(),
1235  new_td->columnIdBySpi_.end(),
1236  [](const size_t a, const size_t b) -> bool { return a < b; });
1237 
1238  std::unique_ptr<StringDictionaryClient> client;
1239  DictRef dict_ref(currentDB_.dbId, -1);
1240  if (!string_dict_hosts_.empty()) {
1241  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1242  }
1243  for (auto dd : dicts) {
1244  if (!dd.dictRef.dictId) {
1245  // Dummy entry created for a shard of a logical table, nothing to do.
1246  continue;
1247  }
1248  dict_ref.dictId = dd.dictRef.dictId;
1249  if (client) {
1250  client->create(dict_ref, dd.dictIsTemp);
1251  }
1252  DictDescriptor* new_dd = new DictDescriptor(dd);
1253  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1254  if (!dd.dictIsTemp) {
1255  boost::filesystem::create_directory(new_dd->dictFolderPath);
1256  }
1257  }
1258 }
1259 
1260 void Catalog::removeTableFromMap(const string& tableName,
1261  const int tableId,
1262  const bool is_on_error) {
1263  cat_write_lock write_lock(this);
1264  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1265  if (tableDescIt == tableDescriptorMapById_.end()) {
1266  throw runtime_error("Table " + tableName + " does not exist.");
1267  }
1268 
1269  TableDescriptor* td = tableDescIt->second;
1270 
1271  if (td->hasDeletedCol) {
1272  const auto ret = deletedColumnPerTable_.erase(td);
1273  CHECK_EQ(ret, size_t(1));
1274  }
1275 
1276  tableDescriptorMapById_.erase(tableDescIt);
1277  tableDescriptorMap_.erase(to_upper(tableName));
1278  td->fragmenter = nullptr;
1279 
1281  delete td;
1282 
1283  std::unique_ptr<StringDictionaryClient> client;
1284  if (SysCatalog::instance().isAggregator()) {
1285  CHECK(!string_dict_hosts_.empty());
1286  DictRef dict_ref(currentDB_.dbId, -1);
1287  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1288  }
1289 
1290  // delete all column descriptors for the table
1291  // no more link columnIds to sequential indexes!
1292  for (auto cit = columnDescriptorMapById_.begin();
1293  cit != columnDescriptorMapById_.end();) {
1294  if (tableId != std::get<0>(cit->first)) {
1295  ++cit;
1296  } else {
1297  int i = std::get<1>(cit++->first);
1298  ColumnIdKey cidKey(tableId, i);
1299  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1300  ColumnDescriptor* cd = colDescIt->second;
1301  columnDescriptorMapById_.erase(colDescIt);
1302  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1303  columnDescriptorMap_.erase(cnameKey);
1304  const int dictId = cd->columnType.get_comp_param();
1305  // Dummy dictionaries created for a shard of a logical table have the id set to
1306  // zero.
1307  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1308  INJECT_TIMER(removingDicts);
1309  DictRef dict_ref(currentDB_.dbId, dictId);
1310  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1311  // If we're removing this table due to an error, it is possible that the string
1312  // dictionary reference was never populated. Don't crash, just continue cleaning
1313  // up the TableDescriptor and ColumnDescriptors
1314  if (!is_on_error) {
1315  CHECK(dictIt != dictDescriptorMapByRef_.end());
1316  } else {
1317  if (dictIt == dictDescriptorMapByRef_.end()) {
1318  continue;
1319  }
1320  }
1321  const auto& dd = dictIt->second;
1322  CHECK_GE(dd->refcount, 1);
1323  --dd->refcount;
1324  if (!dd->refcount) {
1325  dd->stringDict.reset();
1326  if (!isTemp) {
1327  File_Namespace::renameForDelete(dd->dictFolderPath);
1328  }
1329  if (client) {
1330  client->drop(dict_ref);
1331  }
1332  dictDescriptorMapByRef_.erase(dictIt);
1333  }
1334  }
1335 
1336  delete cd;
1337  }
1338  }
1339 }
1340 
1342  cat_write_lock write_lock(this);
1344 }
1345 
1347  cat_write_lock write_lock(this);
1349  std::make_shared<DashboardDescriptor>(vd);
1350 }
1351 
1352 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1353  const int& user_id) {
1354  std::vector<DBObject> objects;
1355  DBObjectKey key;
1356  key.dbId = currentDB_.dbId;
1357  auto _key_place = [&key](auto type, auto id) {
1358  key.permissionType = type;
1359  key.objectId = id;
1360  return key;
1361  };
1362  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1363  auto td = getMetadataForTable(object_name, false);
1364  if (!td) {
1365  // Parsed object source is not present in current database
1366  // LOG the info and ignore
1367  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1368  << " no longer exists in current DB.";
1369  continue;
1370  }
1371  // Dashboard source can be Table or View
1372  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1373  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1375  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1376  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1377  objects.back().setName(td->tableName);
1378  }
1379  return objects;
1380 }
1381 
1382 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1383  const int32_t& user_id,
1384  const std::string& dash_role_name) {
1385  auto objects = parseDashboardObjects(view_meta, user_id);
1386  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1387  if (!rl) {
1388  // Dashboard role does not exist
1389  // create role and grant privileges
1390  // NOTE(wamsi): Transactionally unsafe
1392  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1393  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1394  } else {
1395  // Dashboard system role already exists
1396  // Add/remove privileges on objects
1397  std::set<DBObjectKey> revoke_keys;
1398  auto ex_objects = rl->getDbObjects(true);
1399  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1400  if (key.permissionType != TableDBObjectType &&
1401  key.permissionType != ViewDBObjectType) {
1402  continue;
1403  }
1404  bool found = false;
1405  for (auto obj : objects) {
1406  found = key == obj.getObjectKey() ? true : false;
1407  if (found) {
1408  break;
1409  }
1410  }
1411  if (!found) {
1412  revoke_keys.insert(key);
1413  }
1414  }
1415  for (auto& key : revoke_keys) {
1416  // revoke privs on object since the object is no
1417  // longer used by the dashboard as source
1418  // NOTE(wamsi): Transactionally unsafe
1420  dash_role_name, *rl->findDbObject(key, true), *this);
1421  }
1422  // Update privileges on remaining objects
1423  // NOTE(wamsi): Transactionally unsafe
1424  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1425  }
1426 }
1427 
1429  cat_write_lock write_lock(this);
1430  LinkDescriptor* new_ld = new LinkDescriptor();
1431  *new_ld = ld;
1433  linkDescriptorMapById_[ld.linkId] = new_ld;
1434 }
1435 
1437  auto time_ms = measure<>::execution([&]() {
1438  // instanciate table fragmenter upon first use
1439  // assume only insert order fragmenter is supported
1441  vector<Chunk> chunkVec;
1442  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1443  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1444  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1445  if (td->sortedColumnId > 0) {
1446  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1447  chunkVec,
1448  dataMgr_.get(),
1449  const_cast<Catalog*>(this),
1450  td->tableId,
1451  td->shard,
1452  td->maxFragRows,
1453  td->maxChunkSize,
1454  td->fragPageSize,
1455  td->maxRows,
1456  td->persistenceLevel);
1457  } else {
1458  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1459  chunkVec,
1460  dataMgr_.get(),
1461  const_cast<Catalog*>(this),
1462  td->tableId,
1463  td->shard,
1464  td->maxFragRows,
1465  td->maxChunkSize,
1466  td->fragPageSize,
1467  td->maxRows,
1468  td->persistenceLevel,
1469  !td->storageType.empty());
1470  }
1471  });
1472  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1473  << time_ms << "ms";
1474 }
1475 
1477  const std::string& tableName) const {
1478  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1479  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1480  return nullptr;
1481  }
1482  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1483 }
1484 
1486  const std::string& tableName) const {
1487  cat_read_lock read_lock(this);
1488  return getForeignTableUnlocked(tableName);
1489 }
1490 
1491 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1492  const bool populateFragmenter) const {
1493  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1494  // pure metadata calls
1495  cat_read_lock read_lock(this);
1496  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1497  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1498  return nullptr;
1499  }
1500  TableDescriptor* td = tableDescIt->second;
1501  CHECK(td);
1502  read_lock.unlock();
1503  if (populateFragmenter) {
1504  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1505  if (td->fragmenter == nullptr && !td->isView) {
1507  }
1508  }
1509  return td; // returns pointer to table descriptor
1510 }
1511 
1513  bool populateFragmenter) const {
1514  cat_read_lock read_lock(this);
1515  auto td = getMutableMetadataForTableUnlocked(table_id);
1516  if (!td) {
1517  return nullptr;
1518  }
1519  read_lock.unlock();
1520  if (populateFragmenter) {
1521  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1522  if (td->fragmenter == nullptr && !td->isView) {
1524  }
1525  }
1526  return td;
1527 }
1528 
1530  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1531  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1532  return nullptr;
1533  }
1534  return tableDescIt->second;
1535 }
1536 
1538  const bool load_dict) const {
1539  cat_read_lock read_lock(this);
1540  const DictRef dictRef(currentDB_.dbId, dict_id);
1541  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1542  if (dictDescIt ==
1543  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1544  return nullptr;
1545  }
1546  auto& dd = dictDescIt->second;
1547 
1548  if (load_dict) {
1549  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1550  if (!dd->stringDict) {
1551  auto time_ms = measure<>::execution([&]() {
1552  if (string_dict_hosts_.empty()) {
1553  if (dd->dictIsTemp) {
1554  dd->stringDict = std::make_shared<StringDictionary>(
1555  dd->dictFolderPath, true, true, g_cache_string_hash);
1556  } else {
1557  dd->stringDict = std::make_shared<StringDictionary>(
1558  dd->dictFolderPath, false, true, g_cache_string_hash);
1559  }
1560  } else {
1561  dd->stringDict =
1562  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1563  }
1564  });
1565  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1566  << dd->dictRef.dictId << " was " << time_ms << "ms";
1567  }
1568  }
1569 
1570  return dd.get();
1571 }
1572 
1573 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1574  return string_dict_hosts_;
1575 }
1576 
1578  const string& columnName) const {
1579  cat_read_lock read_lock(this);
1580 
1581  ColumnKey columnKey(tableId, to_upper(columnName));
1582  auto colDescIt = columnDescriptorMap_.find(columnKey);
1583  if (colDescIt ==
1584  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1585  return nullptr;
1586  }
1587  return colDescIt->second;
1588 }
1589 
1590 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1591  cat_read_lock read_lock(this);
1592  ColumnIdKey columnIdKey(table_id, column_id);
1593  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1594  if (colDescIt == columnDescriptorMapById_
1595  .end()) { // need to check to make sure column exists for table
1596  return nullptr;
1597  }
1598  return colDescIt->second;
1599 }
1600 
1601 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1602  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1603  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1604  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1605 
1606  auto spx = spi;
1607  int phi = 0;
1608  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1609  {
1610  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1611  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1612  }
1613 
1614  CHECK(0 < spx && spx <= columnIdBySpi.size());
1615  return columnIdBySpi[spx - 1] + phi;
1616 }
1617 
1618 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
1619  cat_read_lock read_lock(this);
1620  return getColumnIdBySpiUnlocked(table_id, spi);
1621 }
1622 
1624  const size_t spi) const {
1625  cat_read_lock read_lock(this);
1626 
1627  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
1628  ColumnIdKey columnIdKey(tableId, columnId);
1629  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1630  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
1631 }
1632 
1633 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
1634  const UserMetadata& user) {
1635  std::stringstream invalid_ids, restricted_ids;
1636 
1637  for (int32_t dashboard_id : dashboard_ids) {
1638  if (!getMetadataForDashboard(dashboard_id)) {
1639  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
1640  continue;
1641  }
1642  DBObject object(dashboard_id, DashboardDBObjectType);
1643  object.loadKey(*this);
1644  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
1645  std::vector<DBObject> privs = {object};
1646  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
1647  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
1648  }
1649  }
1650 
1651  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
1652  std::stringstream error_message;
1653  error_message << "Delete dashboard(s) failed with error(s):";
1654  if (invalid_ids.str().size() > 0) {
1655  error_message << "\nDashboard id: " << invalid_ids.str()
1656  << " - Dashboard id does not exist";
1657  }
1658  if (restricted_ids.str().size() > 0) {
1659  error_message
1660  << "\nDashboard id: " << restricted_ids.str()
1661  << " - User should be either owner of dashboard or super user to delete it";
1662  }
1663  throw std::runtime_error(error_message.str());
1664  }
1665  std::vector<DBObject> dash_objs;
1666 
1667  for (int32_t dashboard_id : dashboard_ids) {
1668  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
1669  }
1670  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
1672  {
1673  cat_write_lock write_lock(this);
1675 
1676  sqliteConnector_.query("BEGIN TRANSACTION");
1677  try {
1678  for (int32_t dashboard_id : dashboard_ids) {
1679  auto dash = getMetadataForDashboard(dashboard_id);
1680  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
1681  // rollback if already deleted
1682  if (!dash) {
1683  throw std::runtime_error(
1684  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
1685  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
1686  }
1687  std::string user_id = std::to_string(dash->userId);
1688  std::string dash_name = dash->dashboardName;
1689  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
1690  dashboardDescriptorMap_.erase(viewDescIt);
1692  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
1693  std::vector<std::string>{dash_name, user_id});
1694  }
1695  } catch (std::exception& e) {
1696  sqliteConnector_.query("ROLLBACK TRANSACTION");
1697  throw;
1698  }
1699  sqliteConnector_.query("END TRANSACTION");
1700  }
1701 }
1702 
1704  const string& userId,
1705  const string& dashName) const {
1706  cat_read_lock read_lock(this);
1707 
1708  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
1709  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
1710  return nullptr;
1711  }
1712  return viewDescIt->second.get(); // returns pointer to view descriptor
1713 }
1714 
1716  cat_read_lock read_lock(this);
1717  std::string userId;
1718  std::string name;
1719  bool found{false};
1720  {
1721  for (auto descp : dashboardDescriptorMap_) {
1722  auto dash = descp.second.get();
1723  if (dash->dashboardId == id) {
1724  userId = std::to_string(dash->userId);
1725  name = dash->dashboardName;
1726  found = true;
1727  break;
1728  }
1729  }
1730  }
1731  if (found) {
1732  return getMetadataForDashboard(userId, name);
1733  }
1734  return nullptr;
1735 }
1736 
1737 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
1738  cat_read_lock read_lock(this);
1739  auto linkDescIt = linkDescriptorMap_.find(link);
1740  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
1741  return nullptr;
1742  }
1743  return linkDescIt->second; // returns pointer to view descriptor
1744 }
1745 
1747  cat_read_lock read_lock(this);
1748  auto linkDescIt = linkDescriptorMapById_.find(linkId);
1749  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
1750  return nullptr;
1751  }
1752  return linkDescIt->second;
1753 }
1754 
1756  cat_read_lock read_lock(this);
1757  const auto table = getMutableMetadataForTableUnlocked(table_id);
1758  CHECK(table);
1759  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
1760  CHECK(foreign_table);
1761  return foreign_table;
1762 }
1763 
1765  const TableDescriptor* td,
1766  list<const ColumnDescriptor*>& columnDescriptors,
1767  const bool fetchSystemColumns,
1768  const bool fetchVirtualColumns,
1769  const bool fetchPhysicalColumns) const {
1770  int32_t skip_physical_cols = 0;
1771  for (const auto& columnDescriptor : columnDescriptorMapById_) {
1772  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
1773  --skip_physical_cols;
1774  continue;
1775  }
1776  auto cd = columnDescriptor.second;
1777  if (cd->tableId != td->tableId) {
1778  continue;
1779  }
1780  if (!fetchSystemColumns && cd->isSystemCol) {
1781  continue;
1782  }
1783  if (!fetchVirtualColumns && cd->isVirtualCol) {
1784  continue;
1785  }
1786  if (!fetchPhysicalColumns) {
1787  const auto& col_ti = cd->columnType;
1788  skip_physical_cols = col_ti.get_physical_cols();
1789  }
1790  columnDescriptors.push_back(cd);
1791  }
1792 }
1793 
1794 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
1795  const int tableId,
1796  const bool fetchSystemColumns,
1797  const bool fetchVirtualColumns,
1798  const bool fetchPhysicalColumns) const {
1799  cat_read_lock read_lock(this);
1800  std::list<const ColumnDescriptor*> columnDescriptors;
1803  columnDescriptors,
1804  fetchSystemColumns,
1805  fetchVirtualColumns,
1806  fetchPhysicalColumns);
1807  return columnDescriptors;
1808 }
1809 
1810 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
1811  cat_read_lock read_lock(this);
1812  list<const TableDescriptor*> table_list;
1813  for (auto p : tableDescriptorMapById_) {
1814  table_list.push_back(p.second);
1815  }
1816  return table_list;
1817 }
1818 
1819 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
1820  cat_read_lock read_lock(this);
1821  std::vector<TableDescriptor> tables;
1822  tables.reserve(tableDescriptorMapById_.size());
1823  for (auto table_entry : tableDescriptorMapById_) {
1824  tables.emplace_back(*table_entry.second);
1825  }
1826  return tables;
1827 }
1828 
1829 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
1830  cat_read_lock read_lock(this);
1831  list<const DashboardDescriptor*> dashboards;
1832  for (auto dashboard_entry : dashboardDescriptorMap_) {
1833  dashboards.push_back(dashboard_entry.second.get());
1834  }
1835  return dashboards;
1836 }
1837 
1838 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
1839  cat_read_lock read_lock(this);
1840  std::vector<DashboardDescriptor> dashboards;
1841  dashboards.reserve(dashboardDescriptorMap_.size());
1842  for (auto dashboard_entry : dashboardDescriptorMap_) {
1843  dashboards.emplace_back(*dashboard_entry.second);
1844  }
1845  return dashboards;
1846 }
1847 
1849  cat_write_lock write_lock(this);
1850  const auto& td = *tableDescriptorMapById_[cd.tableId];
1851  list<DictDescriptor> dds;
1852  setColumnDictionary(cd, dds, td, true);
1853  auto& dd = dds.back();
1854  CHECK(dd.dictRef.dictId);
1855 
1856  std::unique_ptr<StringDictionaryClient> client;
1857  if (!string_dict_hosts_.empty()) {
1858  client.reset(new StringDictionaryClient(
1859  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
1860  }
1861  if (client) {
1862  client->create(dd.dictRef, dd.dictIsTemp);
1863  }
1864 
1865  DictDescriptor* new_dd = new DictDescriptor(dd);
1866  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
1867  if (!dd.dictIsTemp) {
1868  boost::filesystem::create_directory(new_dd->dictFolderPath);
1869  }
1870  return dd.dictRef;
1871 }
1872 
1874  cat_write_lock write_lock(this);
1876  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
1877  return;
1878  }
1879  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
1880  return;
1881  }
1882  const auto dictId = cd.columnType.get_comp_param();
1883  CHECK_GT(dictId, 0);
1884  // decrement and zero check dict ref count
1885  const auto td = getMetadataForTable(cd.tableId, false);
1886  CHECK(td);
1888  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
1889  std::to_string(dictId));
1891  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
1892  const auto refcount = sqliteConnector_.getData<int>(0, 0);
1893  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
1894  << refcount;
1895  if (refcount > 0) {
1896  return;
1897  }
1898  const DictRef dictRef(currentDB_.dbId, dictId);
1899  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
1900  std::to_string(dictId));
1901  File_Namespace::renameForDelete(g_base_path + "/mapd_data/DB_" +
1902  std::to_string(currentDB_.dbId) + "_DICT_" +
1903  std::to_string(dictId));
1904 
1905  std::unique_ptr<StringDictionaryClient> client;
1906  if (!string_dict_hosts_.empty()) {
1907  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
1908  }
1909  if (client) {
1910  client->drop(dictRef);
1911  }
1912 
1913  dictDescriptorMapByRef_.erase(dictRef);
1914 }
1915 
1917  std::map<int, StringDictionary*>& stringDicts) {
1918  // learn 'committed' ColumnDescriptor of this column
1919  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
1920  CHECK(cit != columnDescriptorMap_.end());
1921  auto& ccd = *cit->second;
1922 
1923  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
1924  return;
1925  }
1926  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
1927  return;
1928  }
1929  if (!(ccd.columnType.get_comp_param() > 0)) {
1930  return;
1931  }
1932 
1933  auto dictId = ccd.columnType.get_comp_param();
1934  getMetadataForDict(dictId);
1935 
1936  const DictRef dictRef(currentDB_.dbId, dictId);
1937  auto dit = dictDescriptorMapByRef_.find(dictRef);
1938  CHECK(dit != dictDescriptorMapByRef_.end());
1939  CHECK(dit->second);
1940  CHECK(dit->second.get()->stringDict);
1941  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
1942 }
1943 
1945  // caller must handle sqlite/chunk transaction TOGETHER
1946  cd.tableId = td.tableId;
1947  if (td.nShards > 0 && td.shard < 0) {
1948  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
1949  auto shard_cd = cd;
1950  addColumn(*shard, shard_cd);
1951  }
1952  }
1954  addDictionary(cd);
1955  }
1956 
1957  using BindType = SqliteConnector::BindType;
1958  std::vector<BindType> types(17, BindType::TEXT);
1959  if (!cd.default_value.has_value()) {
1960  types[16] = BindType::NULL_TYPE;
1961  }
1963  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
1964  "colscale, is_notnull, "
1965  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
1966  "is_deletedcol, default_value) "
1967  "VALUES (?, "
1968  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
1969  "?, ?, ?, "
1970  "?, "
1971  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1972  std::vector<std::string>{std::to_string(td.tableId),
1973  std::to_string(td.tableId),
1974  cd.columnName,
1983  "",
1986  cd.virtualExpr,
1988  cd.default_value.value_or("NULL")},
1989  types);
1990 
1992  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
1993  std::vector<std::string>{std::to_string(td.tableId)});
1994 
1996  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
1997  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
1998  cd.columnId = sqliteConnector_.getData<int>(0, 0);
1999 
2000  ++tableDescriptorMapById_[td.tableId]->nColumns;
2001  auto ncd = new ColumnDescriptor(cd);
2004  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2005 }
2006 
2008  {
2009  cat_write_lock write_lock(this);
2011  // caller must handle sqlite/chunk transaction TOGETHER
2013  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2014  std::vector<std::string>{std::to_string(td.tableId),
2015  std::to_string(cd.columnId)});
2016 
2018  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2019  std::vector<std::string>{std::to_string(td.tableId)});
2020 
2021  ColumnDescriptorMap::iterator columnDescIt =
2023  CHECK(columnDescIt != columnDescriptorMap_.end());
2024 
2025  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2026 
2027  columnDescriptorMap_.erase(columnDescIt);
2029  --tableDescriptorMapById_[td.tableId]->nColumns;
2030  }
2031 
2032  // for each shard
2033  if (td.nShards > 0 && td.shard < 0) {
2034  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2035  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2036  CHECK(shard_cd);
2037  dropColumn(*shard, *shard_cd);
2038  }
2039  }
2040 }
2041 
2042 void Catalog::roll(const bool forward) {
2043  cat_write_lock write_lock(this);
2044  std::set<const TableDescriptor*> tds;
2045 
2046  for (const auto& cdr : columnDescriptorsForRoll) {
2047  auto ocd = cdr.first;
2048  auto ncd = cdr.second;
2049  CHECK(ocd || ncd);
2050  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2051  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2052  auto td = tabDescIt->second;
2053  auto& vc = td->columnIdBySpi_;
2054  if (forward) {
2055  if (ocd) {
2056  if (nullptr == ncd ||
2057  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2058  delDictionary(*ocd);
2059  }
2060 
2061  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2062 
2063  delete ocd;
2064  }
2065  if (ncd) {
2066  // append columnId if its new and not phy geo
2067  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2068  if (!ncd->isGeoPhyCol) {
2069  vc.push_back(ncd->columnId);
2070  }
2071  }
2072  }
2073  tds.insert(td);
2074  } else {
2075  if (ocd) {
2076  columnDescriptorMap_[ColumnKey(ocd->tableId, to_upper(ocd->columnName))] = ocd;
2077  columnDescriptorMapById_[ColumnIdKey(ocd->tableId, ocd->columnId)] = ocd;
2078  }
2079  // roll back the dict of new column
2080  if (ncd) {
2081  columnDescriptorMap_.erase(ColumnKey(ncd->tableId, to_upper(ncd->columnName)));
2082  columnDescriptorMapById_.erase(ColumnIdKey(ncd->tableId, ncd->columnId));
2083  if (nullptr == ocd ||
2084  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2085  delDictionary(*ncd);
2086  }
2087  delete ncd;
2088  }
2089  }
2090  }
2091  columnDescriptorsForRoll.clear();
2092 
2093  if (forward) {
2094  for (const auto td : tds) {
2095  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2096  }
2097  }
2098 }
2099 
2101  list<ColumnDescriptor>& columns) {
2102  const auto& col_ti = cd.columnType;
2103  if (IS_GEO(col_ti.get_type())) {
2104  switch (col_ti.get_type()) {
2105  case kPOINT: {
2106  ColumnDescriptor physical_cd_coords(true);
2107  physical_cd_coords.columnName = cd.columnName + "_coords";
2108  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2109  // Raw data: compressed/uncompressed coords
2110  coords_ti.set_subtype(kTINYINT);
2111  size_t unit_size;
2112  if (col_ti.get_compression() == kENCODING_GEOINT &&
2113  col_ti.get_comp_param() == 32) {
2114  unit_size = 4 * sizeof(int8_t);
2115  } else {
2116  CHECK(col_ti.get_compression() == kENCODING_NONE);
2117  unit_size = 8 * sizeof(int8_t);
2118  }
2119  coords_ti.set_size(2 * unit_size);
2120  physical_cd_coords.columnType = coords_ti;
2121  columns.push_back(physical_cd_coords);
2122 
2123  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2124 
2125  break;
2126  }
2127  case kLINESTRING: {
2128  ColumnDescriptor physical_cd_coords(true);
2129  physical_cd_coords.columnName = cd.columnName + "_coords";
2130  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2131  // Raw data: compressed/uncompressed coords
2132  coords_ti.set_subtype(kTINYINT);
2133  physical_cd_coords.columnType = coords_ti;
2134  columns.push_back(physical_cd_coords);
2135 
2136  ColumnDescriptor physical_cd_bounds(true);
2137  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2138  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2139  bounds_ti.set_subtype(kDOUBLE);
2140  bounds_ti.set_size(4 * sizeof(double));
2141  physical_cd_bounds.columnType = bounds_ti;
2142  columns.push_back(physical_cd_bounds);
2143 
2144  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2145 
2146  break;
2147  }
2148  case kPOLYGON: {
2149  ColumnDescriptor physical_cd_coords(true);
2150  physical_cd_coords.columnName = cd.columnName + "_coords";
2151  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2152  // Raw data: compressed/uncompressed coords
2153  coords_ti.set_subtype(kTINYINT);
2154  physical_cd_coords.columnType = coords_ti;
2155  columns.push_back(physical_cd_coords);
2156 
2157  ColumnDescriptor physical_cd_ring_sizes(true);
2158  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2159  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2160  ring_sizes_ti.set_subtype(kINT);
2161  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2162  columns.push_back(physical_cd_ring_sizes);
2163 
2164  ColumnDescriptor physical_cd_bounds(true);
2165  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2166  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2167  bounds_ti.set_subtype(kDOUBLE);
2168  bounds_ti.set_size(4 * sizeof(double));
2169  physical_cd_bounds.columnType = bounds_ti;
2170  columns.push_back(physical_cd_bounds);
2171 
2172  ColumnDescriptor physical_cd_render_group(true);
2173  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2174  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2175  physical_cd_render_group.columnType = render_group_ti;
2176  columns.push_back(physical_cd_render_group);
2177 
2178  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2179 
2180  break;
2181  }
2182  case kMULTIPOLYGON: {
2183  ColumnDescriptor physical_cd_coords(true);
2184  physical_cd_coords.columnName = cd.columnName + "_coords";
2185  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2186  // Raw data: compressed/uncompressed coords
2187  coords_ti.set_subtype(kTINYINT);
2188  physical_cd_coords.columnType = coords_ti;
2189  columns.push_back(physical_cd_coords);
2190 
2191  ColumnDescriptor physical_cd_ring_sizes(true);
2192  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2193  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2194  ring_sizes_ti.set_subtype(kINT);
2195  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2196  columns.push_back(physical_cd_ring_sizes);
2197 
2198  ColumnDescriptor physical_cd_poly_rings(true);
2199  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2200  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2201  poly_rings_ti.set_subtype(kINT);
2202  physical_cd_poly_rings.columnType = poly_rings_ti;
2203  columns.push_back(physical_cd_poly_rings);
2204 
2205  ColumnDescriptor physical_cd_bounds(true);
2206  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2207  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2208  bounds_ti.set_subtype(kDOUBLE);
2209  bounds_ti.set_size(4 * sizeof(double));
2210  physical_cd_bounds.columnType = bounds_ti;
2211  columns.push_back(physical_cd_bounds);
2212 
2213  ColumnDescriptor physical_cd_render_group(true);
2214  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2215  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2216  physical_cd_render_group.columnType = render_group_ti;
2217  columns.push_back(physical_cd_render_group);
2218 
2219  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2220 
2221  break;
2222  }
2223  default:
2224  throw runtime_error("Unrecognized geometry type.");
2225  break;
2226  }
2227  }
2228 }
2229 
2230 namespace {
2232  auto timing_type_entry =
2234  CHECK(timing_type_entry != foreign_table.options.end());
2235  if (timing_type_entry->second ==
2237  return foreign_storage::get_next_refresh_time(foreign_table.options);
2238  }
2240 }
2241 } // namespace
2242 
2244  TableDescriptor& td,
2245  const list<ColumnDescriptor>& cols,
2246  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2247  bool isLogicalTable) {
2248  cat_write_lock write_lock(this);
2249  list<ColumnDescriptor> cds = cols;
2250  list<DictDescriptor> dds;
2251  std::set<std::string> toplevel_column_names;
2252  list<ColumnDescriptor> columns;
2253 
2254  if (!td.storageType.empty() &&
2257  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2258  }
2259  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2260  }
2261 
2262  for (auto cd : cds) {
2263  if (cd.columnName == "rowid") {
2264  throw std::runtime_error(
2265  "Cannot create column with name rowid. rowid is a system defined column.");
2266  }
2267  columns.push_back(cd);
2268  toplevel_column_names.insert(cd.columnName);
2269  if (cd.columnType.is_geometry()) {
2270  expandGeoColumn(cd, columns);
2271  }
2272  }
2273  cds.clear();
2274 
2275  ColumnDescriptor cd;
2276  // add row_id column -- Must be last column in the table
2277  cd.columnName = "rowid";
2278  cd.isSystemCol = true;
2279  cd.columnType = SQLTypeInfo(kBIGINT, true);
2280 #ifdef MATERIALIZED_ROWID
2281  cd.isVirtualCol = false;
2282 #else
2283  cd.isVirtualCol = true;
2284  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2285 #endif
2286  columns.push_back(cd);
2287  toplevel_column_names.insert(cd.columnName);
2288 
2289  if (td.hasDeletedCol) {
2290  ColumnDescriptor cd_del;
2291  cd_del.columnName = "$deleted$";
2292  cd_del.isSystemCol = true;
2293  cd_del.isVirtualCol = false;
2294  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2295  cd_del.isDeletedCol = true;
2296 
2297  columns.push_back(cd_del);
2298  }
2299 
2300  td.nColumns = columns.size();
2302  sqliteConnector_.query("BEGIN TRANSACTION");
2304  try {
2306  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2307  std::vector<std::string>{td.tableName,
2308  std::to_string(td.userId),
2310  std::to_string(td.isView),
2311  "",
2316  std::to_string(td.maxRows),
2317  td.partitions,
2319  std::to_string(td.shard),
2320  std::to_string(td.nShards),
2322  td.storageType,
2325  td.keyMetainfo});
2326 
2327  // now get the auto generated tableid
2329  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2330  td.tableId = sqliteConnector_.getData<int>(0, 0);
2331  int colId = 1;
2332  for (auto cd : columns) {
2334  const bool is_foreign_col =
2335  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2336  if (!is_foreign_col) {
2337  setColumnDictionary(cd, dds, td, isLogicalTable);
2338  }
2339  }
2340 
2341  if (toplevel_column_names.count(cd.columnName)) {
2342  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2343  if (colId > 1) {
2344  colId += g_test_against_columnId_gap;
2345  }
2346  if (!cd.isGeoPhyCol) {
2347  td.columnIdBySpi_.push_back(colId);
2348  }
2349  }
2350 
2351  using BindType = SqliteConnector::BindType;
2352  std::vector<BindType> types(17, BindType::TEXT);
2353  if (!cd.default_value.has_value()) {
2354  types[16] = BindType::NULL_TYPE;
2355  }
2357  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2358  "coldim, colscale, is_notnull, "
2359  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2360  "virtual_expr, is_deletedcol, default_value) "
2361  "VALUES (?, ?, ?, ?, ?, "
2362  "?, "
2363  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2364  std::vector<std::string>{std::to_string(td.tableId),
2365  std::to_string(colId),
2366  cd.columnName,
2375  "",
2378  cd.virtualExpr,
2380  cd.default_value.value_or("NULL")},
2381  types);
2382  cd.tableId = td.tableId;
2383  cd.columnId = colId++;
2384  cds.push_back(cd);
2385  }
2386  if (td.isView) {
2388  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2389  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2390  }
2392  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2393  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2395  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2396  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2397  std::vector<std::string>{std::to_string(foreign_table.tableId),
2398  std::to_string(foreign_table.foreign_server->id),
2399  foreign_table.getOptionsAsJsonString(),
2400  std::to_string(foreign_table.last_refresh_time),
2401  std::to_string(foreign_table.next_refresh_time)});
2402  }
2403  } catch (std::exception& e) {
2404  sqliteConnector_.query("ROLLBACK TRANSACTION");
2405  throw;
2406  }
2407  } else { // Temporary table
2408  td.tableId = nextTempTableId_++;
2409  int colId = 1;
2410  for (auto cd : columns) {
2412  const bool is_foreign_col =
2413  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2414 
2415  if (!is_foreign_col) {
2416  // Create a new temporary dictionary
2417  std::string fileName("");
2418  std::string folderPath("");
2420  nextTempDictId_++;
2421  DictDescriptor dd(dict_ref,
2422  fileName,
2424  false,
2425  1,
2426  folderPath,
2427  true); // Is dictName (2nd argument) used?
2428  dds.push_back(dd);
2429  if (!cd.columnType.is_array()) {
2431  }
2432  cd.columnType.set_comp_param(dict_ref.dictId);
2433  }
2434  }
2435  if (toplevel_column_names.count(cd.columnName)) {
2436  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2437  if (colId > 1) {
2438  colId += g_test_against_columnId_gap;
2439  }
2440  if (!cd.isGeoPhyCol) {
2441  td.columnIdBySpi_.push_back(colId);
2442  }
2443  }
2444  cd.tableId = td.tableId;
2445  cd.columnId = colId++;
2446  cds.push_back(cd);
2447  }
2448 
2450  serializeTableJsonUnlocked(&td, cds);
2451  }
2452  }
2453 
2454  try {
2455  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2456  if (cache) {
2457  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2458  << "Disk cache at " + cache->getCacheDirectory()
2459  << " contains preexisting data for new table. Please "
2460  "delete or clear cache before continuing";
2461  }
2462 
2463  addTableToMap(&td, cds, dds);
2464  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2465  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2466  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2467  }
2468  } catch (std::exception& e) {
2469  sqliteConnector_.query("ROLLBACK TRANSACTION");
2470  removeTableFromMap(td.tableName, td.tableId, true);
2471  throw;
2472  }
2473  sqliteConnector_.query("END TRANSACTION");
2474 
2476  write_lock.unlock();
2477  sqlite_lock.unlock();
2478  getMetadataForTable(td.tableName,
2479  true); // cause instantiateFragmenter() to be called
2480  }
2481 }
2482 
2483 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2484  const std::list<ColumnDescriptor>& cds) const {
2485  // relies on the catalog write lock
2486  using namespace rapidjson;
2487 
2488  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2489 
2490  const auto db_name = currentDB_.dbName;
2491  const auto file_path = table_json_filepath(basePath_, db_name);
2492 
2493  Document d;
2494  if (boost::filesystem::exists(file_path)) {
2495  // look for an existing file for this database
2496  std::ifstream reader(file_path.string());
2497  CHECK(reader.is_open());
2498  IStreamWrapper json_read_wrapper(reader);
2499  d.ParseStream(json_read_wrapper);
2500  } else {
2501  d.SetObject();
2502  }
2503  CHECK(d.IsObject());
2504  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2505 
2506  Value table(kObjectType);
2507  table.AddMember(
2508  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2509  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2510  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2511 
2512  for (const auto& cd : cds) {
2513  Value column(kObjectType);
2514  column.AddMember(
2515  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2516  column.AddMember("coltype",
2517  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2518  d.GetAllocator());
2519  column.AddMember("colsubtype",
2520  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2521  d.GetAllocator());
2522  column.AddMember(
2523  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2524  column.AddMember(
2525  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2526  column.AddMember(
2527  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2528  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2529  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2530  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2531  table["columns"].PushBack(column, d.GetAllocator());
2532  }
2533  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2534 
2535  // Overwrite the existing file
2536  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2537  CHECK(writer.is_open());
2538  OStreamWrapper json_wrapper(writer);
2539 
2540  Writer<OStreamWrapper> json_writer(json_wrapper);
2541  d.Accept(json_writer);
2542  writer.close();
2543 }
2544 
2545 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2546  // relies on the catalog write lock
2547  using namespace rapidjson;
2548 
2549  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2550 
2551  const auto db_name = currentDB_.dbName;
2552  const auto file_path = table_json_filepath(basePath_, db_name);
2553 
2554  CHECK(boost::filesystem::exists(file_path));
2555  Document d;
2556 
2557  std::ifstream reader(file_path.string());
2558  CHECK(reader.is_open());
2559  IStreamWrapper json_read_wrapper(reader);
2560  d.ParseStream(json_read_wrapper);
2561 
2562  CHECK(d.IsObject());
2563  auto table_name_ref = StringRef(table_name.c_str());
2564  CHECK(d.HasMember(table_name_ref));
2565  CHECK(d.RemoveMember(table_name_ref));
2566 
2567  // Overwrite the existing file
2568  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2569  CHECK(writer.is_open());
2570  OStreamWrapper json_wrapper(writer);
2571 
2572  Writer<OStreamWrapper> json_writer(json_wrapper);
2573  d.Accept(json_writer);
2574  writer.close();
2575 }
2576 
2577 void Catalog::createForeignServer(
2578  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2579  bool if_not_exists) {
2580  cat_write_lock write_lock(this);
2581  cat_sqlite_lock sqlite_lock(getObjForLock());
2582  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
2583 }
2584 
2585 void Catalog::createForeignServerNoLocks(
2586  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2587  bool if_not_exists) {
2588  sqliteConnector_.query_with_text_params(
2589  "SELECT name from omnisci_foreign_servers where name = ?",
2590  std::vector<std::string>{foreign_server->name});
2591 
2592  if (sqliteConnector_.getNumRows() == 0) {
2593  foreign_server->creation_time = std::time(nullptr);
2594  sqliteConnector_.query_with_text_params(
2595  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
2596  "creation_time, "
2597  "options) "
2598  "VALUES (?, ?, ?, ?, ?)",
2599  std::vector<std::string>{foreign_server->name,
2600  foreign_server->data_wrapper_type,
2601  std::to_string(foreign_server->user_id),
2602  std::to_string(foreign_server->creation_time),
2603  foreign_server->getOptionsAsJsonString()});
2604  sqliteConnector_.query_with_text_params(
2605  "SELECT id from omnisci_foreign_servers where name = ?",
2606  std::vector<std::string>{foreign_server->name});
2607  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
2608  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
2609  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
2610  std::move(foreign_server);
2611  CHECK(foreignServerMap_.find(foreign_server_shared->name) == foreignServerMap_.end())
2612  << "Attempting to insert a foreign server into foreign server map that already "
2613  "exists.";
2614  foreignServerMap_[foreign_server_shared->name] = foreign_server_shared;
2615  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
2616  } else if (!if_not_exists) {
2617  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
2618  "\" already exists."};
2619  }
2620 }
2621 
2622 const foreign_storage::ForeignServer* Catalog::getForeignServer(
2623  const std::string& server_name) const {
2624  foreign_storage::ForeignServer* foreign_server = nullptr;
2625  cat_read_lock read_lock(this);
2626 
2627  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
2628  foreign_server = foreignServerMap_.find(server_name)->second.get();
2629  }
2630  return foreign_server;
2631 }
2632 
2633 const std::unique_ptr<const foreign_storage::ForeignServer>
2634 Catalog::getForeignServerFromStorage(const std::string& server_name) {
2635  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
2636  cat_sqlite_lock sqlite_lock(getObjForLock());
2637  sqliteConnector_.query_with_text_params(
2638  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
2639  "FROM omnisci_foreign_servers WHERE name = ?",
2640  std::vector<std::string>{server_name});
2641  if (sqliteConnector_.getNumRows() > 0) {
2642  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
2643  sqliteConnector_.getData<int>(0, 0),
2644  sqliteConnector_.getData<std::string>(0, 1),
2645  sqliteConnector_.getData<std::string>(0, 2),
2646  sqliteConnector_.getData<std::string>(0, 3),
2647  sqliteConnector_.getData<std::int32_t>(0, 4),
2648  sqliteConnector_.getData<std::int32_t>(0, 5));
2649  }
2650  return foreign_server;
2651 }
2652 
2653 const std::unique_ptr<const foreign_storage::ForeignTable>
2654 Catalog::getForeignTableFromStorage(int table_id) {
2655  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
2656  cat_sqlite_lock sqlite_lock(getObjForLock());
2657  sqliteConnector_.query_with_text_params(
2658  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
2659  "omnisci_foreign_tables WHERE table_id = ?",
2660  std::vector<std::string>{to_string(table_id)});
2661  auto num_rows = sqliteConnector_.getNumRows();
2662  if (num_rows > 0) {
2663  CHECK_EQ(size_t(1), num_rows);
2664  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
2665  sqliteConnector_.getData<int>(0, 0),
2666  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
2667  sqliteConnector_.getData<std::string>(0, 2),
2668  sqliteConnector_.getData<int>(0, 3),
2669  sqliteConnector_.getData<int>(0, 4));
2670  }
2671  return foreign_table;
2672 }
2673 
2674 void Catalog::changeForeignServerOwner(const std::string& server_name,
2675  const int new_owner_id) {
2676  cat_write_lock write_lock(this);
2677  foreign_storage::ForeignServer* foreign_server =
2678  foreignServerMap_.find(server_name)->second.get();
2679  CHECK(foreign_server);
2680  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
2681  // update in-memory server
2682  foreign_server->user_id = new_owner_id;
2683 }
2684 
2685 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
2686  const std::string& data_wrapper) {
2687  cat_write_lock write_lock(this);
2688  auto data_wrapper_type = to_upper(data_wrapper);
2689  // update in-memory server
2690  foreign_storage::ForeignServer* foreign_server =
2691  foreignServerMap_.find(server_name)->second.get();
2692  CHECK(foreign_server);
2693  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
2694  foreign_server->data_wrapper_type = data_wrapper_type;
2695  try {
2696  foreign_server->validate();
2697  } catch (const std::exception& e) {
2698  // validation did not succeed:
2699  // revert to saved data_wrapper_type & throw exception
2700  foreign_server->data_wrapper_type = saved_data_wrapper_type;
2701  throw;
2702  }
2703  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
2704 }
2705 
2706 void Catalog::setForeignServerOptions(const std::string& server_name,
2707  const std::string& options) {
2708  cat_write_lock write_lock(this);
2709  // update in-memory server
2710  foreign_storage::ForeignServer* foreign_server =
2711  foreignServerMap_.find(server_name)->second.get();
2712  CHECK(foreign_server);
2713  auto saved_options = foreign_server->options;
2714  foreign_server->populateOptionsMap(options, true);
2715  try {
2716  foreign_server->validate();
2717  } catch (const std::exception& e) {
2718  // validation did not succeed:
2719  // revert to saved options & throw exception
2720  foreign_server->options = saved_options;
2721  throw;
2722  }
2723  setForeignServerProperty(server_name, "options", options);
2724 }
2725 
2726 void Catalog::renameForeignServer(const std::string& server_name,
2727  const std::string& name) {
2728  cat_write_lock write_lock(this);
2729  auto foreign_server_it = foreignServerMap_.find(server_name);
2730  CHECK(foreign_server_it != foreignServerMap_.end());
2731  setForeignServerProperty(server_name, "name", name);
2732  auto foreign_server_shared = foreign_server_it->second;
2733  foreign_server_shared->name = name;
2734  foreignServerMap_[name] = foreign_server_shared;
2735  foreignServerMap_.erase(foreign_server_it);
2736 }
2737 
2738 void Catalog::dropForeignServer(const std::string& server_name) {
2739  cat_write_lock write_lock(this);
2740  cat_sqlite_lock sqlite_lock(getObjForLock());
2741 
2742  sqliteConnector_.query_with_text_params(
2743  "SELECT id from omnisci_foreign_servers where name = ?",
2744  std::vector<std::string>{server_name});
2745  auto num_rows = sqliteConnector_.getNumRows();
2746  if (num_rows > 0) {
2747  CHECK_EQ(size_t(1), num_rows);
2748  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
2749  sqliteConnector_.query_with_text_param(
2750  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
2751  std::to_string(server_id));
2752  if (sqliteConnector_.getNumRows() > 0) {
2753  throw std::runtime_error{"Foreign server \"" + server_name +
2754  "\" is referenced "
2755  "by existing foreign tables and cannot be dropped."};
2756  }
2757  sqliteConnector_.query("BEGIN TRANSACTION");
2758  try {
2759  sqliteConnector_.query_with_text_params(
2760  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
2761  std::vector<std::string>{server_name});
2762  } catch (const std::exception& e) {
2763  sqliteConnector_.query("ROLLBACK TRANSACTION");
2764  throw;
2765  }
2766  sqliteConnector_.query("END TRANSACTION");
2767  foreignServerMap_.erase(server_name);
2768  foreignServerMapById_.erase(server_id);
2769  }
2770 }
2771 
2772 void Catalog::getForeignServersForUser(
2773  const rapidjson::Value* filters,
2774  const UserMetadata& user,
2775  std::vector<const foreign_storage::ForeignServer*>& results) {
2776  sys_read_lock syscat_read_lock(&SysCatalog::instance());
2777  cat_read_lock read_lock(this);
2778  cat_sqlite_lock sqlite_lock(getObjForLock());
2779  // Customer facing and internal SQlite names
2780  std::map<std::string, std::string> col_names{{"server_name", "name"},
2781  {"data_wrapper", "data_wrapper_type"},
2782  {"created_at", "creation_time"},
2783  {"options", "options"}};
2784 
2785  // TODO add "owner" when FSI privilege is implemented
2786  std::stringstream filter_string;
2787  std::vector<std::string> arguments;
2788 
2789  if (filters != nullptr) {
2790  // Create SQL WHERE clause for SQLite query
2791  int num_filters = 0;
2792  filter_string << " WHERE";
2793  for (auto& filter_def : filters->GetArray()) {
2794  if (num_filters > 0) {
2795  filter_string << " " << std::string(filter_def["chain"].GetString());
2796  ;
2797  }
2798 
2799  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
2800  col_names.end()) {
2801  throw std::runtime_error{"Attribute with name \"" +
2802  std::string(filter_def["attribute"].GetString()) +
2803  "\" does not exist."};
2804  }
2805 
2806  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
2807 
2808  bool equals_operator = false;
2809  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
2810  filter_string << " = ? ";
2811  equals_operator = true;
2812  } else {
2813  filter_string << " LIKE ? ";
2814  }
2815 
2816  bool timestamp_column =
2817  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
2818 
2819  if (timestamp_column && !equals_operator) {
2820  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
2821  }
2822 
2823  if (timestamp_column && equals_operator) {
2824  arguments.push_back(std::to_string(
2825  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
2826  } else {
2827  arguments.emplace_back(filter_def["value"].GetString());
2828  }
2829 
2830  num_filters++;
2831  }
2832  }
2833  // Create select query for the omnisci_foreign_servers table
2834  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
2835  query += filter_string.str();
2836 
2837  sqliteConnector_.query_with_text_params(query, arguments);
2838  auto num_rows = sqliteConnector_.getNumRows();
2839 
2840  if (sqliteConnector_.getNumRows() == 0) {
2841  return;
2842  }
2843 
2844  CHECK(sqliteConnector_.getNumCols() == 1);
2845  // Return pointers to objects
2846  results.reserve(num_rows);
2847  for (size_t row = 0; row < num_rows; ++row) {
2848  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
2849  if (shared::contains(INTERNAL_SERVERS, server_name)) {
2850  continue;
2851  }
2852  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
2853  CHECK(foreign_server != nullptr);
2854 
2855  DBObject dbObject(foreign_server->name, ServerDBObjectType);
2856  dbObject.loadKey(*this);
2857  std::vector<DBObject> privObjects = {dbObject};
2858  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
2859  // skip server, as there are no privileges to access it
2860  continue;
2861  }
2862  results.push_back(foreign_server);
2863  }
2864 }
2865 
2866 // returns the table epoch or -1 if there is something wrong with the shared epoch
2867 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
2868  cat_read_lock read_lock(this);
2869  const auto td = getMetadataForTable(table_id, false);
2870  if (!td) {
2871  std::stringstream table_not_found_error_message;
2872  table_not_found_error_message << "Table (" << db_id << "," << table_id
2873  << ") not found";
2874  throw std::runtime_error(table_not_found_error_message.str());
2875  }
2876  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2877  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2878  // check all shards have same checkpoint
2879  const auto physicalTables = physicalTableIt->second;
2880  CHECK(!physicalTables.empty());
2881  size_t curr_epoch{0}, first_epoch{0};
2882  int32_t first_table_id{0};
2883  bool are_epochs_inconsistent{false};
2884  for (size_t i = 0; i < physicalTables.size(); i++) {
2885  int32_t physical_tb_id = physicalTables[i];
2886  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2887  CHECK(phys_td);
2888 
2889  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
2890  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
2891  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
2892  if (i == 0) {
2893  first_epoch = curr_epoch;
2894  first_table_id = physical_tb_id;
2895  } else if (first_epoch != curr_epoch) {
2896  are_epochs_inconsistent = true;
2897  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
2898  << ", db id: " << db_id
2899  << ". First table (table id: " << first_table_id
2900  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
2901  << ", has inconsistent epoch: " << curr_epoch
2902  << ". See previous INFO logs for all epochs and their table ids.";
2903  }
2904  }
2905  if (are_epochs_inconsistent) {
2906  // oh dear the shards do not agree on the epoch for this table
2907  return -1;
2908  }
2909  return curr_epoch;
2910  } else {
2911  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
2912  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
2913  << ", epoch: " << epoch;
2914  return epoch;
2915  }
2916 }
2917 
2918 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
2919  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
2920  << " back to new epoch " << new_epoch;
2921  const auto td = getMetadataForTable(table_id, false);
2922  if (!td) {
2923  std::stringstream table_not_found_error_message;
2924  table_not_found_error_message << "Table (" << db_id << "," << table_id
2925  << ") not found";
2926  throw std::runtime_error(table_not_found_error_message.str());
2927  }
2929  std::stringstream is_temp_table_error_message;
2930  is_temp_table_error_message << "Cannot set epoch on temporary table";
2931  throw std::runtime_error(is_temp_table_error_message.str());
2932  }
2933 
2934  File_Namespace::FileMgrParams file_mgr_params;
2935  file_mgr_params.epoch = new_epoch;
2936  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
2937 
2938  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
2939  CHECK(!physical_tables.empty());
2940  for (const auto table : physical_tables) {
2941  auto table_id = table->tableId;
2942  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
2943  << " back to new epoch " << new_epoch;
2944  // Should have table lock from caller so safe to do this after, avoids
2945  // having to repopulate data on error
2946  removeChunks(table_id);
2947  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
2948  }
2949 }
2950 
2951 void Catalog::alterPhysicalTableMetadata(
2952  const TableDescriptor* td,
2953  const TableDescriptorUpdateParams& table_update_params) {
2954  // Only called from parent alterTableParamMetadata, expect already to have catalog and
2955  // sqlite write locks
2956 
2957  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
2958 
2959  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
2960  CHECK(mutable_td);
2961  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
2962  sqliteConnector_.query_with_text_params(
2963  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
2964  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
2965  std::to_string(td->tableId)});
2966  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
2967  }
2968 
2969  if (td->maxRows != table_update_params.max_rows) {
2970  sqliteConnector_.query_with_text_params(
2971  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
2972  std::vector<std::string>{std::to_string(table_update_params.max_rows),
2973  std::to_string(td->tableId)});
2974  mutable_td->maxRows = table_update_params.max_rows;
2975  }
2976 }
2977 
2978 void Catalog::alterTableMetadata(const TableDescriptor* td,
2979  const TableDescriptorUpdateParams& table_update_params) {
2980  cat_write_lock write_lock(this);
2981  cat_sqlite_lock sqlite_lock(getObjForLock());
2982  sqliteConnector_.query("BEGIN TRANSACTION");
2983  try {
2984  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
2985  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
2986  const auto physical_tables = physical_table_it->second;
2987  CHECK(!physical_tables.empty());
2988  for (size_t i = 0; i < physical_tables.size(); i++) {
2989  int32_t physical_tb_id = physical_tables[i];
2990  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2991  CHECK(phys_td);
2992  alterPhysicalTableMetadata(phys_td, table_update_params);
2993  }
2994  }
2995  alterPhysicalTableMetadata(td, table_update_params);
2996  } catch (std::exception& e) {
2997  sqliteConnector_.query("ROLLBACK TRANSACTION");
2998  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
2999  }
3000  sqliteConnector_.query("END TRANSACTION");
3001 }
3002 
3003 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3004  const int32_t max_rollback_epochs) {
3005  // Must be called from AlterTableParamStmt or other method that takes executor and
3006  // TableSchema locks
3007  if (max_rollback_epochs <= -1) {
3008  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3009  }
3010  const auto td = getMetadataForTable(
3011  table_id, false); // Deep copy as there will be gap between read and write locks
3012  CHECK(td); // Existence should have already been checked in
3013  // ParserNode::AlterTableParmStmt
3014  TableDescriptorUpdateParams table_update_params(td);
3015  table_update_params.max_rollback_epochs = max_rollback_epochs;
3016  if (table_update_params == td) { // Operator is overloaded to test for equality
3017  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3018  << " to existing value, skipping operation";
3019  return;
3020  }
3021  File_Namespace::FileMgrParams file_mgr_params;
3022  file_mgr_params.epoch = -1; // Use existing epoch
3023  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3024  setTableFileMgrParams(table_id, file_mgr_params);
3025  alterTableMetadata(td, table_update_params);
3026 }
3027 
3028 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3029  if (max_rows < 0) {
3030  throw std::runtime_error("Max rows cannot be a negative number.");
3031  }
3032  const auto td = getMetadataForTable(table_id);
3033  CHECK(td);
3034  TableDescriptorUpdateParams table_update_params(td);
3035  table_update_params.max_rows = max_rows;
3036  if (table_update_params == td) {
3037  LOG(INFO) << "Max rows value of " << max_rows
3038  << " is the same as the existing value. Skipping update.";
3039  return;
3040  }
3041  alterTableMetadata(td, table_update_params);
3042  CHECK(td->fragmenter);
3043  td->fragmenter->dropFragmentsToSize(max_rows);
3044 }
3045 
3046 // For testing purposes only
3047 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3048  cat_write_lock write_lock(this);
3049  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3050  CHECK(td_entry != tableDescriptorMap_.end());
3051  auto td = td_entry->second;
3052  TableDescriptorUpdateParams table_update_params(td);
3053  table_update_params.max_rollback_epochs = -1;
3054  write_lock.unlock();
3055 
3056  alterTableMetadata(td, table_update_params);
3057  File_Namespace::FileMgrParams file_mgr_params;
3058  file_mgr_params.max_rollback_epochs = -1;
3059  setTableFileMgrParams(td->tableId, file_mgr_params);
3060 }
3061 
3062 void Catalog::setTableFileMgrParams(
3063  const int table_id,
3064  const File_Namespace::FileMgrParams& file_mgr_params) {
3065  // Expects parent to have write lock
3066  const auto td = getMetadataForTable(table_id, false);
3067  const auto db_id = this->getDatabaseId();
3068  if (!td) {
3069  std::stringstream table_not_found_error_message;
3070  table_not_found_error_message << "Table (" << db_id << "," << table_id
3071  << ") not found";
3072  throw std::runtime_error(table_not_found_error_message.str());
3073  }
3075  std::stringstream is_temp_table_error_message;
3076  is_temp_table_error_message << "Cannot set storage params on temporary table";
3077  throw std::runtime_error(is_temp_table_error_message.str());
3078  }
3079 
3080  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3081  CHECK(!physical_tables.empty());
3082  for (const auto table : physical_tables) {
3083  auto table_id = table->tableId;
3084  removeChunks(table_id);
3085  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3086  }
3087 }
3088 
3089 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3090  const int32_t table_id) const {
3091  cat_read_lock read_lock(this);
3092  std::vector<TableEpochInfo> table_epochs;
3093  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3094  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3095  const auto physical_tables = physical_table_it->second;
3096  CHECK(!physical_tables.empty());
3097 
3098  for (const auto physical_tb_id : physical_tables) {
3099  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3100  CHECK(phys_td);
3101 
3102  auto table_id = phys_td->tableId;
3103  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3104  table_epochs.emplace_back(table_id, epoch);
3105  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3106  << ", table id: " << table_id << ", epoch: " << epoch;
3107  }
3108  } else {
3109  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3110  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3111  << ", epoch: " << epoch;
3112  table_epochs.emplace_back(table_id, epoch);
3113  }
3114  return table_epochs;
3115 }
3116 
3117 void Catalog::setTableEpochs(const int32_t db_id,
3118  const std::vector<TableEpochInfo>& table_epochs) const {
3119  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3120  CHECK(td);
3121  File_Namespace::FileMgrParams file_mgr_params;
3122  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3123 
3124  for (const auto& table_epoch_info : table_epochs) {
3125  removeChunks(table_epoch_info.table_id);
3126  file_mgr_params.epoch = table_epoch_info.table_epoch;
3127  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3128  db_id, table_epoch_info.table_id, file_mgr_params);
3129  LOG(INFO) << "Set table epoch for db id: " << db_id
3130  << ", table id: " << table_epoch_info.table_id
3131  << ", back to epoch: " << table_epoch_info.table_epoch;
3132  }
3133 }
3134 
3135 namespace {
3136 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3137  std::string table_epochs_str{"["};
3138  bool first_entry{true};
3139  for (const auto& table_epoch : table_epochs) {
3140  if (first_entry) {
3141  first_entry = false;
3142  } else {
3143  table_epochs_str += ", ";
3144  }
3145  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3146  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3147  }
3148  table_epochs_str += "]";
3149  return table_epochs_str;
3150 }
3151 } // namespace
3152 
3153 void Catalog::setTableEpochsLogExceptions(
3154  const int32_t db_id,
3155  const std::vector<TableEpochInfo>& table_epochs) const {
3156  try {
3157  setTableEpochs(db_id, table_epochs);
3158  } catch (std::exception& e) {
3159  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3160  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3161  << ", Error: " << e.what();
3162  }
3163 }
3164 
3165 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3166  cat_read_lock read_lock(this);
3167  const auto it = deletedColumnPerTable_.find(td);
3168  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3169 }
3170 
3171 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3172  int delete_column_id) const {
3173  // check if there are rows deleted by examining the deletedColumn metadata
3174  CHECK(td);
3175  auto fragmenter = td->fragmenter;
3176  if (fragmenter) {
3177  return fragmenter->hasDeletedRows(delete_column_id);
3178  } else {
3179  return false;
3180  }
3181 }
3182 
3183 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3184  const TableDescriptor* td) const {
3185  std::vector<const TableDescriptor*> tds;
3186  const ColumnDescriptor* cd;
3187  {
3188  cat_read_lock read_lock(this);
3189 
3190  const auto it = deletedColumnPerTable_.find(td);
3191  // if not a table that supports delete return nullptr, nothing more to do
3192  if (it == deletedColumnPerTable_.end()) {
3193  return nullptr;
3194  }
3195  cd = it->second;
3196  tds = getPhysicalTablesDescriptors(td, false);
3197  }
3198  // individual tables are still protected by higher level locks
3199  for (auto tdd : tds) {
3200  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3201  return cd;
3202  }
3203  }
3204  // no deletes so far recorded in metadata
3205  return nullptr;
3206 }
3207 
3208 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3209  cat_write_lock write_lock(this);
3210  setDeletedColumnUnlocked(td, cd);
3211 }
3212 
3213 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3214  const ColumnDescriptor* cd) {
3215  cat_write_lock write_lock(this);
3216  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3217  CHECK(it_ok.second);
3218 }
3219 
3220 namespace {
3221 
3223  const Catalog& cat,
3224  const Parser::SharedDictionaryDef& shared_dict_def) {
3225  const auto& table_name = shared_dict_def.get_foreign_table();
3226  const auto td = cat.getMetadataForTable(table_name, false);
3227  CHECK(td);
3228  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3229  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3230 }
3231 
3232 } // namespace
3233 
3234 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3235  Parser::SharedDictionaryDef shared_dict_def,
3236  const bool persist_reference) {
3237  cat_write_lock write_lock(this);
3238  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3239  CHECK(foreign_ref_col);
3240  referencing_column.columnType = foreign_ref_col->columnType;
3241  const int dict_id = referencing_column.columnType.get_comp_param();
3242  const DictRef dict_ref(currentDB_.dbId, dict_id);
3243  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3244  CHECK(dictIt != dictDescriptorMapByRef_.end());
3245  const auto& dd = dictIt->second;
3246  CHECK_GE(dd->refcount, 1);
3247  ++dd->refcount;
3248  if (persist_reference) {
3249  cat_sqlite_lock sqlite_lock(getObjForLock());
3250  sqliteConnector_.query_with_text_params(
3251  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3252  {std::to_string(dict_id)});
3253  }
3254 }
3255 
3256 bool Catalog::setColumnSharedDictionary(
3257  ColumnDescriptor& cd,
3258  std::list<ColumnDescriptor>& cdd,
3259  std::list<DictDescriptor>& dds,
3260  const TableDescriptor td,
3261  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3262  cat_write_lock write_lock(this);
3263  cat_sqlite_lock sqlite_lock(getObjForLock());
3264 
3265  if (shared_dict_defs.empty()) {
3266  return false;
3267  }
3268  for (const auto& shared_dict_def : shared_dict_defs) {
3269  // check if the current column is a referencing column
3270  const auto& column = shared_dict_def.get_column();
3271  if (cd.columnName == column) {
3272  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3273  // Dictionaries are being shared in table to be created
3274  const auto& ref_column = shared_dict_def.get_foreign_column();
3275  auto colIt =
3276  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3277  return !ref_column.compare(it.columnName);
3278  });
3279  CHECK(colIt != cdd.end());
3280  cd.columnType = colIt->columnType;
3281 
3282  const int dict_id = colIt->columnType.get_comp_param();
3283  CHECK_GE(dict_id, 1);
3284  auto dictIt = std::find_if(
3285  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3286  return it.dictRef.dbId == this->currentDB_.dbId &&
3287  it.dictRef.dictId == dict_id;
3288  });
3289  if (dictIt != dds.end()) {
3290  // There exists dictionary definition of a dictionary column
3291  CHECK_GE(dictIt->refcount, 1);
3292  ++dictIt->refcount;
3293  if (!table_is_temporary(&td)) {
3294  // Persist reference count
3295  sqliteConnector_.query_with_text_params(
3296  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3297  {std::to_string(dict_id)});
3298  }
3299  } else {
3300  // The dictionary is referencing a column which is referencing a column in
3301  // diffrent table
3302  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3303  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3304  }
3305  } else {
3306  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3307  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3308  if (table_is_temporary(foreign_td)) {
3309  if (!table_is_temporary(&td)) {
3310  throw std::runtime_error(
3311  "Only temporary tables can share dictionaries with other temporary "
3312  "tables.");
3313  }
3314  addReferenceToForeignDict(cd, shared_dict_def, false);
3315  } else {
3316  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3317  }
3318  }
3319  return true;
3320  }
3321  }
3322  return false;
3323 }
3324 
3325 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3326  std::list<DictDescriptor>& dds,
3327  const TableDescriptor& td,
3328  const bool isLogicalTable) {
3329  cat_write_lock write_lock(this);
3330 
3331  std::string dictName{"Initial_key"};
3332  int dictId{0};
3333  std::string folderPath;
3334  if (isLogicalTable) {
3335  cat_sqlite_lock sqlite_lock(getObjForLock());
3336 
3337  sqliteConnector_.query_with_text_params(
3338  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3339  "?, 1)",
3340  std::vector<std::string>{
3341  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3342  sqliteConnector_.query_with_text_param(
3343  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3344  dictId = sqliteConnector_.getData<int>(0, 0);
3345  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3346  sqliteConnector_.query_with_text_param(
3347  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3348  folderPath = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
3349  "_DICT_" + std::to_string(dictId);
3350  }
3351  DictDescriptor dd(currentDB_.dbId,
3352  dictId,
3353  dictName,
3355  false,
3356  1,
3357  folderPath,
3358  false);
3359  dds.push_back(dd);
3360  if (!cd.columnType.is_array()) {
3362  }
3363  cd.columnType.set_comp_param(dictId);
3364 }
3365 
3366 void Catalog::createShardedTable(
3367  TableDescriptor& td,
3368  const list<ColumnDescriptor>& cols,
3369  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3370  /* create logical table */
3371  TableDescriptor* tdl = &td;
3372  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3373  int32_t logical_tb_id = tdl->tableId;
3374  std::string logical_table_name = tdl->tableName;
3375 
3376  /* create physical tables and link them to the logical table */
3377  std::vector<int32_t> physicalTables;
3378  for (int32_t i = 1; i <= td.nShards; i++) {
3379  TableDescriptor* tdp = &td;
3380  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3381  tdp->shard = i - 1;
3382  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3383  int32_t physical_tb_id = tdp->tableId;
3384 
3385  /* add physical table to the vector of physical tables */
3386  physicalTables.push_back(physical_tb_id);
3387  }
3388 
3389  if (!physicalTables.empty()) {
3390  cat_write_lock write_lock(this);
3391  /* add logical to physical tables correspondence to the map */
3392  const auto it_ok =
3393  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3394  CHECK(it_ok.second);
3395  /* update sqlite mapd_logical_to_physical in sqlite database */
3396  if (!table_is_temporary(&td)) {
3397  updateLogicalToPhysicalTableMap(logical_tb_id);
3398  }
3399  }
3400 }
3401 
3402 void Catalog::truncateTable(const TableDescriptor* td) {
3403  // truncate all corresponding physical tables
3404  const auto physical_tables = getPhysicalTablesDescriptors(td);
3405  for (const auto table : physical_tables) {
3406  doTruncateTable(table);
3407  }
3408 }
3409 
3410 void Catalog::doTruncateTable(const TableDescriptor* td) {
3411  // must destroy fragmenter before deleteChunks is called.
3412  removeFragmenterForTable(td->tableId);
3413 
3414  const int tableId = td->tableId;
3415  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3416  // assuming deleteChunksWithPrefix is atomic
3417  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3418  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3419 
3420  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3421 
3422  cat_write_lock write_lock(this);
3423  std::unique_ptr<StringDictionaryClient> client;
3424  if (SysCatalog::instance().isAggregator()) {
3425  CHECK(!string_dict_hosts_.empty());
3426  DictRef dict_ref(currentDB_.dbId, -1);
3427  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3428  }
3429  // clean up any dictionaries
3430  // delete all column descriptors for the table
3431  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3432  auto cd = columnDescriptor.second;
3433  if (cd->tableId != td->tableId) {
3434  continue;
3435  }
3436  const int dict_id = cd->columnType.get_comp_param();
3437  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3438  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3439  const DictRef dict_ref(currentDB_.dbId, dict_id);
3440  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3441  CHECK(dictIt != dictDescriptorMapByRef_.end());
3442  const auto& dd = dictIt->second;
3443  CHECK_GE(dd->refcount, 1);
3444  // if this is the only table using this dict reset the dict
3445  if (dd->refcount == 1) {
3446  // close the dictionary
3447  dd->stringDict.reset();
3448  File_Namespace::renameForDelete(dd->dictFolderPath);
3449  if (client) {
3450  client->drop(dd->dictRef);
3451  }
3452  if (!dd->dictIsTemp) {
3453  boost::filesystem::create_directory(dd->dictFolderPath);
3454  }
3455  }
3456 
3457  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3458  dd->dictName,
3459  dd->dictNBits,
3460  dd->dictIsShared,
3461  dd->refcount,
3462  dd->dictFolderPath,
3463  dd->dictIsTemp);
3464  dictDescriptorMapByRef_.erase(dictIt);
3465  // now create new Dict -- need to figure out what to do here for temp tables
3466  if (client) {
3467  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3468  }
3469  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3470  getMetadataForDict(new_dd->dictRef.dictId);
3471  }
3472  }
3473 }
3474 
3475 void Catalog::removeFragmenterForTable(const int table_id) const {
3476  cat_write_lock write_lock(this);
3477  auto td = getMetadataForTable(table_id, false);
3478  if (td->fragmenter != nullptr) {
3479  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3480  CHECK(tableDescIt != tableDescriptorMapById_.end());
3481  tableDescIt->second->fragmenter = nullptr;
3482  CHECK(td->fragmenter == nullptr);
3483  }
3484 }
3485 
3486 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3487 void Catalog::removeChunks(const int table_id) const {
3488  removeFragmenterForTable(table_id);
3489 
3490  // remove the chunks from in memory structures
3491  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3492 
3493  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3494  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3495 }
3496 
3497 void Catalog::dropTable(const TableDescriptor* td) {
3498  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
3500  std::vector<const TableDescriptor*> tables_to_drop;
3501  {
3502  cat_read_lock read_lock(this);
3503  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3504  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3505  // remove all corresponding physical tables if this is a logical table
3506  const auto physicalTables = physicalTableIt->second;
3507  CHECK(!physicalTables.empty());
3508  for (size_t i = 0; i < physicalTables.size(); i++) {
3509  int32_t physical_tb_id = physicalTables[i];
3510  const TableDescriptor* phys_td =
3511  getMutableMetadataForTableUnlocked(physical_tb_id);
3512  CHECK(phys_td);
3513  tables_to_drop.emplace_back(phys_td);
3514  }
3515  }
3516  tables_to_drop.emplace_back(td);
3517  }
3518 
3519  for (auto table : tables_to_drop) {
3520  eraseTablePhysicalData(table);
3521  }
3522 
3523  {
3524  cat_write_lock write_lock(this);
3525  cat_sqlite_lock sqlite_lock(getObjForLock());
3526  sqliteConnector_.query("BEGIN TRANSACTION");
3527  try {
3528  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
3529  sqliteConnector_.query_with_text_param(
3530  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
3531  std::to_string(td->tableId));
3532  logicalToPhysicalTableMapById_.erase(td->tableId);
3533  for (auto table : tables_to_drop) {
3534  eraseTableMetadata(table);
3535  }
3536  } catch (std::exception& e) {
3537  sqliteConnector_.query("ROLLBACK TRANSACTION");
3538  throw;
3539  }
3540  sqliteConnector_.query("END TRANSACTION");
3541  }
3542 }
3543 
3544 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
3545  executeDropTableSqliteQueries(td);
3547  dropTableFromJsonUnlocked(td->tableName);
3548  }
3549  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3550  {
3551  INJECT_TIMER(removeTableFromMap_);
3552  removeTableFromMap(td->tableName, td->tableId);
3553  }
3554 }
3555 
3556 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
3557  const int tableId = td->tableId;
3558  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
3559  std::to_string(tableId));
3560  sqliteConnector_.query_with_text_params(
3561  "select comp_param from mapd_columns where compression = ? and tableid = ?",
3562  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3563  int numRows = sqliteConnector_.getNumRows();
3564  std::vector<int> dict_id_list;
3565  for (int r = 0; r < numRows; ++r) {
3566  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
3567  }
3568  for (auto dict_id : dict_id_list) {
3569  sqliteConnector_.query_with_text_params(
3570  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
3571  std::vector<std::string>{std::to_string(dict_id)});
3572  }
3573  sqliteConnector_.query_with_text_params(
3574  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
3575  "mapd_columns where compression = ? "
3576  "and tableid = ?) and refcount = 0",
3577  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3578  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
3579  std::to_string(tableId));
3580  if (td->isView) {
3581  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
3582  std::to_string(tableId));
3583  }
3585  sqliteConnector_.query_with_text_param(
3586  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
3587  }
3588 }
3589 
3590 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
3591  cat_write_lock write_lock(this);
3592  cat_sqlite_lock sqlite_lock(getObjForLock());
3593 
3594  sqliteConnector_.query("BEGIN TRANSACTION");
3595  try {
3596  sqliteConnector_.query_with_text_params(
3597  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3598  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
3599  } catch (std::exception& e) {
3600  sqliteConnector_.query("ROLLBACK TRANSACTION");
3601  throw;
3602  }
3603  sqliteConnector_.query("END TRANSACTION");
3604  TableDescriptorMap::iterator tableDescIt =
3605  tableDescriptorMap_.find(to_upper(td->tableName));
3606  CHECK(tableDescIt != tableDescriptorMap_.end());
3607  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3608  // Get table descriptor to change it
3609  TableDescriptor* changeTd = tableDescIt->second;
3610  changeTd->tableName = newTableName;
3611  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3612  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3613  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3614 }
3615 
3616 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
3617  {
3618  cat_write_lock write_lock(this);
3619  cat_sqlite_lock sqlite_lock(getObjForLock());
3620  // rename all corresponding physical tables if this is a logical table
3621  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3622  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3623  const auto physicalTables = physicalTableIt->second;
3624  CHECK(!physicalTables.empty());
3625  for (size_t i = 0; i < physicalTables.size(); i++) {
3626  int32_t physical_tb_id = physicalTables[i];
3627  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3628  CHECK(phys_td);
3629  std::string newPhysTableName =
3630  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
3631  renamePhysicalTable(phys_td, newPhysTableName);
3632  }
3633  }
3634  renamePhysicalTable(td, newTableName);
3635  }
3636  {
3637  DBObject object(newTableName, TableDBObjectType);
3638  // update table name in direct and effective priv map
3639  DBObjectKey key;
3640  key.dbId = currentDB_.dbId;
3641  key.objectId = td->tableId;
3642  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3643  object.setObjectKey(key);
3644  auto objdescs = SysCatalog::instance().getMetadataForObject(
3645  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
3646  for (auto obj : objdescs) {
3647  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3648  if (grnt) {
3649  grnt->renameDbObject(object);
3650  }
3651  }
3652  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3653  }
3654 }
3655 
3656 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
3657  std::vector<int>& tableIds) {
3658  cat_write_lock write_lock(this);
3659  cat_sqlite_lock sqlite_lock(getObjForLock());
3660 
3661  // execute the SQL query
3662  try {
3663  for (size_t i = 0; i < names.size(); i++) {
3664  int tableId = tableIds[i];
3665  std::string& newTableName = names[i].second;
3666 
3667  sqliteConnector_.query_with_text_params(
3668  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3669  std::vector<std::string>{newTableName, std::to_string(tableId)});
3670  }
3671  } catch (std::exception& e) {
3672  sqliteConnector_.query("ROLLBACK TRANSACTION");
3673  throw;
3674  }
3675 
3676  // reset the table descriptors, give Calcite a kick
3677  for (size_t i = 0; i < names.size(); i++) {
3678  std::string& curTableName = names[i].first;
3679  std::string& newTableName = names[i].second;
3680 
3681  TableDescriptorMap::iterator tableDescIt =
3682  tableDescriptorMap_.find(to_upper(curTableName));
3683  CHECK(tableDescIt != tableDescriptorMap_.end());
3684  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3685 
3686  // Get table descriptor to change it
3687  TableDescriptor* changeTd = tableDescIt->second;
3688  changeTd->tableName = newTableName;
3689  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3690  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3691  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3692  }
3693 }
3694 
3695 // Collect an 'overlay' mapping of the tableNames->tableId
3696 // to account for possible chained renames
3697 // (for swap: a->b, b->c, c->d, d->a)
3698 
3700  std::map<std::string, int>& cachedTableMap,
3701  std::string& curTableName) {
3702  auto iter = cachedTableMap.find(curTableName);
3703  if ((iter != cachedTableMap.end())) {
3704  // get the cached tableId
3705  // and use that to lookup the TableDescriptor
3706  int tableId = (*iter).second;
3707  if (tableId == -1) {
3708  return NULL;
3709  } else {
3710  return cat->getMetadataForTable(tableId);
3711  }
3712  }
3713 
3714  // else ... lookup in standard location
3715  return cat->getMetadataForTable(curTableName);
3716 }
3717 
3718 void replaceTableName(std::map<std::string, int>& cachedTableMap,
3719  std::string& curTableName,
3720  std::string& newTableName,
3721  int tableId) {
3722  // mark old/cur name as deleted
3723  cachedTableMap[curTableName] = -1;
3724 
3725  // insert the 'new' name
3726  cachedTableMap[newTableName] = tableId;
3727 }
3728 
3729 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
3730  // tableId of all tables being renamed
3731  // ... in matching order to 'names'
3732  std::vector<int> tableIds;
3733 
3734  // (sorted & unique) list of tables ids for locking
3735  // (with names index of src in case of error)
3736  // <tableId, strIndex>
3737  // std::map is by definition/implementation sorted
3738  // std::map current usage below tests to avoid over-write
3739  std::map<int, size_t> uniqueOrderedTableIds;
3740 
3741  // mapping of modified tables names -> tableId
3742  std::map<std::string, int> cachedTableMap;
3743 
3744  // -------- Setup --------
3745 
3746  // gather tableIds pre-execute; build maps
3747  for (size_t i = 0; i < names.size(); i++) {
3748  std::string& curTableName = names[i].first;
3749  std::string& newTableName = names[i].second;
3750 
3751  // make sure the table being renamed exists,
3752  // or will exist when executed in 'name' order
3753  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
3754  CHECK(td);
3755 
3756  tableIds.push_back(td->tableId);
3757  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
3758  // don't overwrite as it should map to the first names index 'i'
3759  uniqueOrderedTableIds[td->tableId] = i;
3760  }
3761  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
3762  }
3763 
3764  CHECK_EQ(tableIds.size(), names.size());
3765 
3766  // The outer Stmt created a write lock before calling the catalog rename table
3767  // -> TODO: might want to sort out which really should set the lock :
3768  // the comment in the outer scope indicates it should be in here
3769  // but it's not clear if the access done there *requires* it out there
3770  //
3771  // Lock tables pre-execute (may/will be in different order than rename occurs)
3772  // const auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
3773  // *legacylockmgr::LockMgr<mapd_shared_mutex, bool>::getMutex(
3774  // legacylockmgr::ExecutorOuterLock, true));
3775 
3776  // acquire the locks for all tables being renamed
3778  for (auto& idPair : uniqueOrderedTableIds) {
3779  std::string& tableName = names[idPair.second].first;
3780  tableLocks.emplace_back(
3783  *this, tableName, false)));
3784  }
3785 
3786  // -------- Rename --------
3787 
3788  {
3789  cat_write_lock write_lock(this);
3790  cat_sqlite_lock sqlite_lock(getObjForLock());
3791 
3792  sqliteConnector_.query("BEGIN TRANSACTION");
3793 
3794  // collect all (tables + physical tables) into a single list
3795  std::vector<std::pair<std::string, std::string>> allNames;
3796  std::vector<int> allTableIds;
3797 
3798  for (size_t i = 0; i < names.size(); i++) {
3799  int tableId = tableIds[i];
3800  std::string& curTableName = names[i].first;
3801  std::string& newTableName = names[i].second;
3802 
3803  // rename all corresponding physical tables if this is a logical table
3804  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
3805  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3806  const auto physicalTables = physicalTableIt->second;
3807  CHECK(!physicalTables.empty());
3808  for (size_t k = 0; k < physicalTables.size(); k++) {
3809  int32_t physical_tb_id = physicalTables[k];
3810  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3811  CHECK(phys_td);
3812  std::string newPhysTableName =
3813  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
3814  allNames.emplace_back(phys_td->tableName, newPhysTableName);
3815  allTableIds.push_back(phys_td->tableId);
3816  }
3817  }
3818  allNames.emplace_back(curTableName, newTableName);
3819  allTableIds.push_back(tableId);
3820  }
3821  // rename all tables in one shot
3822  renamePhysicalTable(allNames, allTableIds);
3823 
3824  sqliteConnector_.query("END TRANSACTION");
3825  // cat write/sqlite locks are released when they go out scope
3826  }
3827  {
3828  // now update the SysCatalog
3829  for (size_t i = 0; i < names.size(); i++) {
3830  int tableId = tableIds[i];
3831  std::string& newTableName = names[i].second;
3832  {
3833  // update table name in direct and effective priv map
3834  DBObjectKey key;
3835  key.dbId = currentDB_.dbId;
3836  key.objectId = tableId;
3837  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3838 
3839  DBObject object(newTableName, TableDBObjectType);
3840  object.setObjectKey(key);
3841 
3842  auto objdescs = SysCatalog::instance().getMetadataForObject(
3843  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
3844  for (auto obj : objdescs) {
3845  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3846  if (grnt) {
3847  grnt->renameDbObject(object);
3848  }
3849  }
3850  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3851  }
3852  }
3853  }
3854 
3855  // -------- Cleanup --------
3856 
3857  // table locks are released when 'tableLocks' goes out of scope
3858 }
3859 
3860 void Catalog::renameColumn(const TableDescriptor* td,
3861  const ColumnDescriptor* cd,
3862  const string& newColumnName) {
3863  cat_write_lock write_lock(this);
3864  cat_sqlite_lock sqlite_lock(getObjForLock());
3865  sqliteConnector_.query("BEGIN TRANSACTION");
3866  try {
3867  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3868  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3869  CHECK(cdx);
3870  std::string new_column_name = cdx->columnName;
3871  new_column_name.replace(0, cd->columnName.size(), newColumnName);
3872  sqliteConnector_.query_with_text_params(
3873  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
3874  std::vector<std::string>{new_column_name,
3875  std::to_string(td->tableId),
3876  std::to_string(cdx->columnId)});
3877  }
3878  } catch (std::exception& e) {
3879  sqliteConnector_.query("ROLLBACK TRANSACTION");
3880  throw;
3881  }
3882  sqliteConnector_.query("END TRANSACTION");
3883  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3884  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3885  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3886  CHECK(cdx);
3887  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
3888  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
3889  CHECK(columnDescIt != columnDescriptorMap_.end());
3890  ColumnDescriptor* changeCd = columnDescIt->second;
3891  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
3892  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
3893  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
3894  changeCd;
3895  }
3896  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3897 }
3898 
3899 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
3900  cat_write_lock write_lock(this);
3901  cat_sqlite_lock sqlite_lock(getObjForLock());
3902  sqliteConnector_.query("BEGIN TRANSACTION");
3903  try {
3904  // TODO(andrew): this should be an upsert
3905  sqliteConnector_.query_with_text_params(
3906  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
3907  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3908  if (sqliteConnector_.getNumRows() > 0) {
3909  sqliteConnector_.query_with_text_params(
3910  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
3911  "update_time = "
3912  "datetime('now') where name = ? "
3913  "and userid = ?",
3914  std::vector<std::string>{vd.dashboardState,
3915  vd.imageHash,
3916  vd.dashboardMetadata,
3917  vd.dashboardName,
3918  std::to_string(vd.userId)});
3919  } else {
3920  sqliteConnector_.query_with_text_params(
3921  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
3922  "update_time, "
3923  "userid) "
3924  "VALUES "
3925  "(?,?,?,?, "
3926  "datetime('now'), ?)",
3927  std::vector<std::string>{vd.dashboardName,
3928  vd.dashboardState,
3929  vd.imageHash,
3930  vd.dashboardMetadata,
3931  std::to_string(vd.userId)});
3932  }
3933  } catch (std::exception& e) {
3934  sqliteConnector_.query("ROLLBACK TRANSACTION");
3935  throw;
3936  }
3937  sqliteConnector_.query("END TRANSACTION");
3938 
3939  // now get the auto generated dashboardId
3940  try {
3941  sqliteConnector_.query_with_text_params(
3942  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
3943  "WHERE name = ? and userid = ?",
3944  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3945  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
3946  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
3947  } catch (std::exception& e) {
3948  throw;
3949  }
3951  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
3952  addFrontendViewToMap(vd);
3953  sqlite_lock.unlock();
3954  write_lock.unlock();
3955  // NOTE(wamsi): Transactionally unsafe
3956  createOrUpdateDashboardSystemRole(
3958  return vd.dashboardId;
3959 }
3960 
3961 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
3962  cat_write_lock write_lock(this);
3963  cat_sqlite_lock sqlite_lock(getObjForLock());
3964 
3965  CHECK(sqliteConnector_.getSqlitePtr());
3966  sqliteConnector_.query("BEGIN TRANSACTION");
3967  try {
3968  sqliteConnector_.query_with_text_params(
3969  "SELECT id FROM mapd_dashboards WHERE id = ?",
3970  std::vector<std::string>{std::to_string(vd.dashboardId)});
3971  if (sqliteConnector_.getNumRows() > 0) {
3972  sqliteConnector_.query_with_text_params(
3973  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
3974  "?, userid = ?, update_time = datetime('now') where id = ? ",
3975  std::vector<std::string>{vd.dashboardName,
3976  vd.dashboardState,
3977  vd.imageHash,
3978  vd.dashboardMetadata,
3979  std::to_string(vd.userId),
3981  } else {
3982  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
3983  << " does not exist in db";
3984  throw runtime_error("Error replacing dashboard id " +
3985  std::to_string(vd.dashboardId) + " does not exist in db");
3986  }
3987  } catch (std::exception& e) {
3988  sqliteConnector_.query("ROLLBACK TRANSACTION");
3989  throw;
3990  }
3991  sqliteConnector_.query("END TRANSACTION");
3992 
3993  bool found{false};
3994  for (auto descp : dashboardDescriptorMap_) {
3995  auto dash = descp.second.get();
3996  if (dash->dashboardId == vd.dashboardId) {
3997  found = true;
3998  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
3999  dash->dashboardName);
4000  if (viewDescIt ==
4001  dashboardDescriptorMap_.end()) { // check to make sure view exists
4002  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4003  << " dashboard " << dash->dashboardName << " does not exist in map";
4004  throw runtime_error("No metadata for dashboard for user " +
4005  std::to_string(dash->userId) + " dashboard " +
4006  dash->dashboardName + " does not exist in map");
4007  }
4008  dashboardDescriptorMap_.erase(viewDescIt);
4009  break;
4010  }
4011  }
4012  if (!found) {
4013  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4014  << " does not exist in map";
4015  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4016  " does not exist in map");
4017  }
4018 
4019  // now reload the object
4020  sqliteConnector_.query_with_text_params(
4021  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4022  "mapd_dashboards "
4023  "WHERE id = ?",
4024  std::vector<std::string>{std::to_string(vd.dashboardId)});
4025  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4027  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4028  addFrontendViewToMapNoLock(vd);
4029  sqlite_lock.unlock();
4030  write_lock.unlock();
4031  // NOTE(wamsi): Transactionally unsafe
4032  createOrUpdateDashboardSystemRole(
4034 }
4035 
4036 std::string Catalog::calculateSHA1(const std::string& data) {
4037  boost::uuids::detail::sha1 sha1;
4038  unsigned int digest[5];
4039  sha1.process_bytes(data.c_str(), data.length());
4040  sha1.get_digest(digest);
4041  std::stringstream ss;
4042  for (size_t i = 0; i < 5; i++) {
4043  ss << std::hex << digest[i];
4044  }
4045  return ss.str();
4046 }
4047 
4048 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4049  cat_write_lock write_lock(this);
4050  cat_sqlite_lock sqlite_lock(getObjForLock());
4051  sqliteConnector_.query("BEGIN TRANSACTION");
4052  try {
4053  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4054  .substr(0, 8);
4055  sqliteConnector_.query_with_text_params(
4056  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4057  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4058  if (sqliteConnector_.getNumRows() > 0) {
4059  sqliteConnector_.query_with_text_params(
4060  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4061  "link = ?",
4062  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4063  } else {
4064  sqliteConnector_.query_with_text_params(
4065  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4066  "update_time) VALUES (?,?,?,?, datetime('now'))",
4067  std::vector<std::string>{
4068  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4069  }
4070  // now get the auto generated dashid
4071  sqliteConnector_.query_with_text_param(
4072  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4073  "WHERE link = ?",
4074  ld.link);
4075  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4076  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4077  } catch (std::exception& e) {
4078  sqliteConnector_.query("ROLLBACK TRANSACTION");
4079  throw;
4080  }
4081  sqliteConnector_.query("END TRANSACTION");
4082  addLinkToMap(ld);
4083  return ld.link;
4084 }
4085 
4086 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4087  const TableDescriptor* td) const {
4088  cat_read_lock read_lock(this);
4089 
4090  const auto column_descriptors =
4091  getAllColumnMetadataForTable(td->tableId, false, true, true);
4092 
4093  const ColumnDescriptor* shard_cd{nullptr};
4094  int i = 1;
4095  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4096  ++cd_itr, ++i) {
4097  if (i == td->shardedColumnId) {
4098  shard_cd = *cd_itr;
4099  }
4100  }
4101  return shard_cd;
4102 }
4103 
4104 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4105  const TableDescriptor* logical_table_desc,
4106  bool populate_fragmenter) const {
4107  cat_read_lock read_lock(this);
4108  const auto physicalTableIt =
4109  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4110  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4111  return {logical_table_desc};
4112  }
4113  const auto physicalTablesIds = physicalTableIt->second;
4114  CHECK(!physicalTablesIds.empty());
4115  read_lock.unlock();
4116  std::vector<const TableDescriptor*> physicalTables;
4117  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4118  physicalTables.push_back(
4119  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4120  }
4121  return physicalTables;
4122 }
4123 
4124 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4125  cat_read_lock read_lock(this);
4126 
4127  std::map<int, const ColumnDescriptor*> mapping;
4128 
4129  const auto tables = getAllTableMetadata();
4130  for (const auto td : tables) {
4131  if (td->shard >= 0) {
4132  // skip shards, they're not standalone tables
4133  continue;
4134  }
4135 
4136  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4137  const auto& ti = cd->columnType;
4138  if (ti.is_string()) {
4139  if (ti.get_compression() == kENCODING_DICT) {
4140  // if foreign reference, get referenced tab.col
4141  const auto dict_id = ti.get_comp_param();
4142 
4143  // ignore temp (negative) dictionaries
4144  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4145  mapping[dict_id] = cd;
4146  }
4147  }
4148  }
4149  }
4150  }
4151 
4152  return mapping;
4153 }
4154 
4155 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4156  const UserMetadata& user_metadata,
4157  const GetTablesType get_tables_type) const {
4158  if (td->shard >= 0) {
4159  // skip shards, they're not standalone tables
4160  return false;
4161  }
4162  switch (get_tables_type) {
4163  case GET_PHYSICAL_TABLES: {
4164  if (td->isView) {
4165  return false;
4166  }
4167  break;
4168  }
4169  case GET_VIEWS: {
4170  if (!td->isView) {
4171  return false;
4172  }
4173  break;
4174  }
4175  default:
4176  break;
4177  }
4179  dbObject.loadKey(*this);
4180  std::vector<DBObject> privObjects = {dbObject};
4181  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4182  // skip table, as there are no privileges to access it
4183  return false;
4184  }
4185  return true;
4186 }
4187 
4188 std::vector<std::string> Catalog::getTableNamesForUser(
4189  const UserMetadata& user_metadata,
4190  const GetTablesType get_tables_type) const {
4191  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4192  cat_read_lock read_lock(this);
4193  std::vector<std::string> table_names;
4194  const auto tables = getAllTableMetadata();
4195  for (const auto td : tables) {
4196  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4197  table_names.push_back(td->tableName);
4198  }
4199  }
4200  return table_names;
4201 }
4202 
4203 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4204  const UserMetadata& user_metadata,
4205  const GetTablesType get_tables_type,
4206  const std::string& filter_table_name) const {
4207  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4208  cat_read_lock read_lock(this);
4209 
4210  std::vector<TableMetadata> tables_metadata;
4211  const auto tables = getAllTableMetadata();
4212  for (const auto td : tables) {
4213  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4214  if (!filter_table_name.empty()) {
4215  if (td->tableName != filter_table_name) {
4216  continue;
4217  }
4218  }
4219  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4220  // descriptor outside catalog lock
4221  tables_metadata.emplace_back(table_metadata);
4222  }
4223  }
4224  return tables_metadata;
4225 }
4226 
4227 int Catalog::getLogicalTableId(const int physicalTableId) const {
4228  cat_read_lock read_lock(this);
4229  for (const auto& l : logicalToPhysicalTableMapById_) {
4230  if (l.second.end() != std::find_if(l.second.begin(),
4231  l.second.end(),
4232  [&](decltype(*l.second.begin()) tid) -> bool {
4233  return physicalTableId == tid;
4234  })) {
4235  return l.first;
4236  }
4237  }
4238  return physicalTableId;
4239 }
4240 
4241 void Catalog::checkpoint(const int logicalTableId) const {
4242  const auto td = getMetadataForTable(logicalTableId);
4243  const auto shards = getPhysicalTablesDescriptors(td);
4244  for (const auto shard : shards) {
4245  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4246  }
4247 }
4248 
4249 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4250  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4251  try {
4252  checkpoint(logical_table_id);
4253  } catch (...) {
4254  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4255  throw;
4256  }
4257 }
4258 
4259 void Catalog::eraseDbMetadata() {
4260  const auto tables = getAllTableMetadata();
4261  for (const auto table : tables) {
4262  eraseTableMetadata(table);
4263  }
4264  // Physically erase database metadata
4265  boost::filesystem::remove(basePath_ + "/mapd_catalogs/" + currentDB_.dbName);
4266  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4267 }
4268 
4269 void Catalog::eraseDbPhysicalData() {
4270  const auto tables = getAllTableMetadata();
4271  for (const auto table : tables) {
4272  eraseTablePhysicalData(table);
4273  }
4274 }
4275 
4276 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4277  const int tableId = td->tableId;
4278  // must destroy fragmenter before deleteChunks is called.
4279  removeFragmenterForTable(tableId);
4280 
4281  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4282  {
4283  INJECT_TIMER(deleteChunksWithPrefix);
4284  // assuming deleteChunksWithPrefix is atomic
4285  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4286  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4287  }
4288  if (!td->isView) {
4289  INJECT_TIMER(Remove_Table);
4290  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4291  }
4292 }
4293 
4294 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4295  const int32_t& shardNumber) {
4296  std::string physicalTableName =
4297  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4298  return (physicalTableName);
4299 }
4300 
4301 void Catalog::buildForeignServerMap() {
4302  sqliteConnector_.query(
4303  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4304  "omnisci_foreign_servers");
4305  auto num_rows = sqliteConnector_.getNumRows();
4306  for (size_t row = 0; row < num_rows; row++) {
4307  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4308  sqliteConnector_.getData<int>(row, 0),
4309  sqliteConnector_.getData<std::string>(row, 1),
4310  sqliteConnector_.getData<std::string>(row, 2),
4311  sqliteConnector_.getData<std::string>(row, 3),
4312  sqliteConnector_.getData<std::int32_t>(row, 4),
4313  sqliteConnector_.getData<std::int32_t>(row, 5));
4314  foreignServerMap_[foreign_server->name] = foreign_server;
4315  foreignServerMapById_[foreign_server->id] = foreign_server;
4316  }
4317 }
4318 
4319 void Catalog::addForeignTableDetails() {
4320  sqliteConnector_.query(
4321  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4322  "omnisci_foreign_tables");
4323  auto num_rows = sqliteConnector_.getNumRows();
4324  for (size_t r = 0; r < num_rows; r++) {
4325  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4326  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4327  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4328  const auto last_refresh_time = sqliteConnector_.getData<int>(r, 3);
4329  const auto next_refresh_time = sqliteConnector_.getData<int>(r, 4);
4330 
4331  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4332  auto foreign_table =
4333  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4334  CHECK(foreign_table);
4335  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4336  CHECK(foreign_table->foreign_server);
4337  foreign_table->populateOptionsMap(options);
4338  foreign_table->last_refresh_time = last_refresh_time;
4339  foreign_table->next_refresh_time = next_refresh_time;
4340  }
4341 }
4342 
4343 void Catalog::setForeignServerProperty(const std::string& server_name,
4344  const std::string& property,
4345  const std::string& value) {
4346  cat_sqlite_lock sqlite_lock(getObjForLock());
4347  sqliteConnector_.query_with_text_params(
4348  "SELECT id from omnisci_foreign_servers where name = ?",
4349  std::vector<std::string>{server_name});
4350  auto num_rows = sqliteConnector_.getNumRows();
4351  if (num_rows > 0) {
4352  CHECK_EQ(size_t(1), num_rows);
4353  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4354  sqliteConnector_.query_with_text_params(
4355  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4356  std::vector<std::string>{value, std::to_string(server_id)});
4357  } else {
4358  throw std::runtime_error{"Can not change property \"" + property +
4359  "\" for foreign server." + " Foreign server \"" +
4360  server_name + "\" is not found."};
4361  }
4362 }
4363 
4364 void Catalog::createDefaultServersIfNotExists() {
4368 
4369  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4370  "omnisci_local_csv",
4372  options,
4374  local_csv_server->validate();
4375  createForeignServerNoLocks(std::move(local_csv_server), true);
4376 
4377  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4378  "omnisci_local_parquet",
4380  options,
4382  local_parquet_server->validate();
4383  createForeignServerNoLocks(std::move(local_parquet_server), true);
4384 
4385  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
4386  "omnisci_local_regex_parser",
4388  options,
4390  local_regex_parser_server->validate();
4391  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
4392 }
4393 
4394 // prepare a fresh file reload on next table access
4395 void Catalog::setForReload(const int32_t tableId) {
4396  const auto td = getMetadataForTable(tableId);
4397  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4398  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4399  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4400  }
4401 }
4402 
4403 // get a table's data dirs
4404 std::vector<std::string> Catalog::getTableDataDirectories(
4405  const TableDescriptor* td) const {
4406  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4407  std::vector<std::string> file_paths;
4408  for (auto shard : getPhysicalTablesDescriptors(td)) {
4409  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4410  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4411  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4412  file_paths.push_back(file_path.filename().string());
4413  }
4414  return file_paths;
4415 }
4416 
4417 // get a column's dict dir basename
4418 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd) const {
4419  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4421  cd->columnType.get_comp_param() > 0) {
4422  const auto dictId = cd->columnType.get_comp_param();
4423  const DictRef dictRef(currentDB_.dbId, dictId);
4424  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4425  CHECK(dit != dictDescriptorMapByRef_.end());
4426  CHECK(dit->second);
4427  boost::filesystem::path file_path(dit->second->dictFolderPath);
4428  return file_path.filename().string();
4429  }
4430  return std::string();
4431 }
4432 
4433 // get a table's dict dirs
4434 std::vector<std::string> Catalog::getTableDictDirectories(
4435  const TableDescriptor* td) const {
4436  std::vector<std::string> file_paths;
4437  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4438  auto file_base = getColumnDictDirectory(cd);
4439  if (!file_base.empty() &&
4440  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4441  file_paths.push_back(file_base);
4442  }
4443  }
4444  return file_paths;
4445 }
4446 
4447 // returns table schema in a string
4448 // NOTE(sy): Might be able to replace dumpSchema() later with
4449 // dumpCreateTable() after a deeper review of the TableArchiver code.
4450 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
4451  CHECK(!td->is_system_table);
4452  cat_read_lock read_lock(this);
4453 
4454  std::ostringstream os;
4455  os << "CREATE TABLE @T (";
4456  // gather column defines
4457  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4458  std::string comma;
4459  std::vector<std::string> shared_dicts;
4460  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4461  for (const auto cd : cds) {
4462  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4463  const auto& ti = cd->columnType;
4464  os << comma << cd->columnName;
4465  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4466  if (ti.get_type() == SQLTypes::kCHAR) {
4467  os << " "
4468  << "TEXT";
4469  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4470  os << " "
4471  << "TEXT[]";
4472  } else {
4473  os << " " << ti.get_type_name();
4474  }
4475  os << (ti.get_notnull() ? " NOT NULL" : "");
4476  if (cd->default_value.has_value()) {
4477  os << " DEFAULT " << cd->getDefaultValueLiteral();
4478  }
4479  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4480  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4481  if (ti.get_compression() == kENCODING_DICT) {
4482  // if foreign reference, get referenced tab.col
4483  const auto dict_id = ti.get_comp_param();
4484  const DictRef dict_ref(currentDB_.dbId, dict_id);
4485  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4486  CHECK(dict_it != dictDescriptorMapByRef_.end());
4487  const auto dict_name = dict_it->second->dictName;
4488  // when migrating a table, any foreign dict ref will be dropped
4489  // and the first cd of a dict will become root of the dict
4490  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
4491  dict_root_cds[dict_name] = cd;
4492  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4493  } else {
4494  const auto dict_root_cd = dict_root_cds[dict_name];
4495  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
4496  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
4497  // "... shouldn't specify an encoding, it borrows from the referenced
4498  // column"
4499  }
4500  } else {
4501  os << " ENCODING NONE";
4502  }
4503  } else if (ti.is_date_in_days() ||
4504  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4505  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4506  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4507  } else if (ti.is_geometry()) {
4508  if (ti.get_compression() == kENCODING_GEOINT) {
4509  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4510  << ")";
4511  } else {
4512  os << " ENCODING NONE";
4513  }
4514  }
4515  comma = ", ";
4516  }
4517  }
4518  // gather SHARED DICTIONARYs
4519  if (shared_dicts.size()) {
4520  os << ", " << boost::algorithm::join(shared_dicts, ", ");
4521  }
4522  // gather WITH options ...
4523  std::vector<std::string> with_options;
4524  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4525  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4526  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4527  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4528  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4529  : "VACUUM='IMMEDIATE'");
4530  if (!td->partitions.empty()) {
4531  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4532  }
4533  if (td->nShards > 0) {
4534  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4535  CHECK(shard_cd);
4536  os << ", SHARD KEY(" << shard_cd->columnName << ")";
4537  with_options.push_back(
4538  "SHARD_COUNT=" +
4539  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4540  }
4541  if (td->sortedColumnId > 0) {
4542  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4543  CHECK(sort_cd);
4544  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4545  }
4547  td->maxRollbackEpochs != -1) {
4548  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4550  }
4551  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
4552  return os.str();
4553 }
4554 
4555 #include "Parser/ReservedKeywords.h"
4556 
4558 inline bool contains_spaces(std::string_view str) {
4559  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
4560  return std::isspace(ch);
4561  }) != str.end();
4562 }
4563 
4566  std::string_view str,
4567  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
4568  return str.find_first_of(chars) != std::string_view::npos;
4569 }
4570 
4572 inline bool is_reserved_sql_keyword(std::string_view str) {
4573  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
4574 }
4575 
4576 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
4577 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
4578  bool multiline_formatting,
4579  bool dump_defaults) const {
4580  cat_read_lock read_lock(this);
4581 
4582  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
4583  std::ostringstream os;
4584 
4585  if (foreign_table && !td->is_system_table) {
4586  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
4587  } else if (!td->isView) {
4588  os << "CREATE ";
4590  os << "TEMPORARY ";
4591  }
4592  os << "TABLE " + td->tableName + " (";
4593  } else {
4594  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
4595  return os.str();
4596  }
4597  // scan column defines
4598  std::vector<std::string> additional_info;
4599  std::set<std::string> shared_dict_column_names;
4600 
4601  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
4602 
4603  // gather column defines
4604  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4605  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4606  bool first = true;
4607  for (const auto cd : cds) {
4608  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4609  const auto& ti = cd->columnType;
4610  if (!first) {
4611  os << ",";
4612  if (!multiline_formatting) {
4613  os << " ";
4614  }
4615  } else {
4616  first = false;
4617  }
4618  if (multiline_formatting) {
4619  os << "\n ";
4620  }
4621  // column name
4622  os << quoteIfRequired(cd->columnName);
4623  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4624  if (ti.get_type() == SQLTypes::kCHAR) {
4625  os << " "
4626  << "TEXT";
4627  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4628  os << " "
4629  << "TEXT[]";
4630  } else {
4631  os << " " << ti.get_type_name();
4632  }
4633  os << (ti.get_notnull() ? " NOT NULL" : "");
4634  if (cd->default_value.has_value()) {
4635  os << " DEFAULT " << cd->getDefaultValueLiteral();
4636  }
4637  if (shared_dict_column_names.find(cd->columnName) ==
4638  shared_dict_column_names.end()) {
4639  // avoids "Column ... shouldn't specify an encoding, it borrows it
4640  // from the referenced column"
4641  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4642  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4643  if (ti.get_compression() == kENCODING_DICT) {
4644  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4645  } else {
4646  os << " ENCODING NONE";
4647  }
4648  } else if (ti.is_date_in_days() ||
4649  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4650  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4651  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4652  } else if (ti.is_geometry()) {
4653  if (ti.get_compression() == kENCODING_GEOINT) {
4654  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4655  << ")";
4656  } else {
4657  os << " ENCODING NONE";
4658  }
4659  }
4660  }
4661  }
4662  }
4663  // gather SHARED DICTIONARYs
4664  if (additional_info.size()) {
4665  std::string comma;
4666  if (!multiline_formatting) {
4667  comma = ", ";
4668  } else {
4669  comma = ",\n ";
4670  }
4671  os << comma;
4672  os << boost::algorithm::join(additional_info, comma);
4673  }
4674  os << ")";
4675 
4676  std::vector<std::string> with_options;
4677  if (foreign_table && !td->is_system_table) {
4678  if (multiline_formatting) {
4679  os << "\n";
4680  } else {
4681  os << " ";
4682  }
4683  os << "SERVER " << foreign_table->foreign_server->name;
4684 
4685  // gather WITH options ...
4686  for (const auto& [option, value] : foreign_table->options) {
4687  with_options.emplace_back(option + "='" + value + "'");
4688  }
4689  }
4690 
4691  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
4692  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4693  }
4694  if (!foreign_table && (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE)) {
4695  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4696  }
4697  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
4698  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4699  }
4700  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
4701  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4702  }
4703  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
4704  td->maxRollbackEpochs != -1) {
4705  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4707  }
4708  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
4709  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4710  : "VACUUM='IMMEDIATE'");
4711  }
4712  if (!foreign_table && !td->partitions.empty()) {
4713  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4714  }
4715  if (!foreign_table && td->nShards > 0) {
4716  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4717  CHECK(shard_cd);
4718  with_options.push_back(
4719  "SHARD_COUNT=" +
4720  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4721  }
4722  if (!foreign_table && td->sortedColumnId > 0) {
4723  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4724  CHECK(sort_cd);
4725  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4726  }
4727 
4728  if (!with_options.empty()) {
4729  if (!multiline_formatting) {
4730  os << " ";
4731  } else {
4732  os << "\n";
4733  }
4734  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
4735  }
4736  os << ";";
4737  return os.str();
4738 }
4739 
4740 bool Catalog::validateNonExistentTableOrView(const std::string& name,
4741  const bool if_not_exists) {
4742  if (getMetadataForTable(name, false)) {
4743  if (if_not_exists) {
4744  return false;
4745  }
4746  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
4747  }
4748  return true;
4749 }
4750 
4751 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
4752  cat_read_lock read_lock(this);
4753  std::vector<const TableDescriptor*> tables;
4754  for (auto entry : tableDescriptorMapById_) {
4755  auto table_descriptor = entry.second;
4756  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
4757  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4758  CHECK(foreign_table);
4759  auto timing_type_entry = foreign_table->options.find(
4761  CHECK(timing_type_entry != foreign_table->options.end());
4762  int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
4763  std::chrono::system_clock::now().time_since_epoch())
4764  .count();
4765  if (timing_type_entry->second ==
4767  foreign_table->next_refresh_time <= current_time) {
4768  tables.emplace_back(foreign_table);
4769  }
4770  }
4771  }
4772  return tables;
4773 }
4774 
4775 void Catalog::updateForeignTableRefreshTimes(const int32_t table_id) {
4776  cat_write_lock write_lock(this);
4777  cat_sqlite_lock sqlite_lock(getObjForLock());
4778  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4779  auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
4780  CHECK(table_descriptor);
4781  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4782  CHECK(foreign_table);
4783  int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
4784  std::chrono::system_clock::now().time_since_epoch())
4785  .count();
4786  auto last_refresh_time = current_time;
4787  auto next_refresh_time = get_next_refresh_time(*foreign_table);
4788  sqliteConnector_.query_with_text_params(
4789  "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
4790  "WHERE table_id = ?",
4791  std::vector<std::string>{std::to_string(last_refresh_time),
4792  std::to_string(next_refresh_time),
4793  std::to_string(foreign_table->tableId)});
4794  foreign_table->last_refresh_time = last_refresh_time;
4795  foreign_table->next_refresh_time = next_refresh_time;
4796 }
4797 
4798 // TODO(Misiu): This function should be merged with setForeignServerOptions via
4799 // inheritance rather than replication similar functions.
4800 void Catalog::setForeignTableOptions(const std::string& table_name,
4801  foreign_storage::OptionsMap& options_map,
4802  bool clear_existing_options) {
4803  cat_write_lock write_lock(this);
4804  // update in-memory table
4805  auto foreign_table = getForeignTableUnlocked(table_name);
4806  auto saved_options = foreign_table->options;
4807  foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
4808  try {
4809  foreign_table->validateOptionValues();
4810  } catch (const std::exception& e) {
4811  // validation did not succeed:
4812  // revert to saved options & throw exception
4813  foreign_table->options = saved_options;
4814  throw;
4815  }
4816  setForeignTableProperty(
4817  foreign_table, "options", foreign_table->getOptionsAsJsonString());
4818 }
4819 
4820 void Catalog::setForeignTableProperty(const foreign_storage::ForeignTable* table,
4821  const std::string& property,
4822  const std::string& value) {
4823  cat_sqlite_lock sqlite_lock(getObjForLock());
4824  sqliteConnector_.query_with_text_params(
4825  "SELECT table_id from omnisci_foreign_tables where table_id = ?",
4826  std::vector<std::string>{std::to_string(table->tableId)});
4827  auto num_rows = sqliteConnector_.getNumRows();
4828  if (num_rows > 0) {
4829  CHECK_EQ(size_t(1), num_rows);
4830  sqliteConnector_.query_with_text_params(
4831  "UPDATE omnisci_foreign_tables SET " + property + " = ? WHERE table_id = ?",
4832  std::vector<std::string>{value, std::to_string(table->tableId)});
4833  } else {
4834  throw std::runtime_error{"Can not change property \"" + property +
4835  "\" for foreign table." + " Foreign table \"" +
4836  table->tableName + "\" is not found."};
4837  }
4838 }
4839 
4840 std::string Catalog::quoteIfRequired(const std::string& column_name) const {
4841  if (is_reserved_sql_keyword(column_name) || contains_spaces(column_name) ||
4842  contains_sql_reserved_chars(column_name)) {
4843  return get_quoted_string(column_name, '"', '"');
4844  } else {
4845  return column_name;
4846  }
4847 }
4848 
4849 // this will gather information that represents the shared dictionary columns
4850 // as they are on the table NOW not at original creation
4851 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
4852  std::set<std::string>& shared_dict_column_names,
4853  const TableDescriptor* td) const {
4854  if (td->nShards > 0) {
4855  ColumnIdKey columnIdKey(td->tableId, td->shardedColumnId);
4856  auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
4857  CHECK(scd);
4858  std::string txt = "SHARD KEY (" + quoteIfRequired(scd->columnName) + ")";
4859  additional_info.emplace_back(txt);
4860  }
4861  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4862  for (const auto cd : cds) {
4863  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4864  const SQLTypeInfo& ti = cd->columnType;
4865  if (ti.get_compression() != kENCODING_DICT) {
4866  continue;
4867  }
4868  auto dictId = ti.get_comp_param();
4869 
4870  // now we need to check how many other users have this dictionary
4871 
4872  DictRef dict_ref(currentDB_.dbId, dictId);
4873  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4874  if (dictIt == dictDescriptorMapByRef_.end()) {
4875  LOG(ERROR) << "missing dictionary " << dictId << " for table " << td->tableName;
4876  continue;
4877  }
4878 
4879  const auto& dd = dictIt->second;
4880  if (dd->refcount > 1) {
4881  auto lowest_table = td->tableId;
4882  auto lowest_column = cd->columnId;
4883  std::string lowest_column_name;
4884  // we have multiple tables using this dictionary
4885  // find the other occurances and keep the "lowest"
4886  for (auto const& [key, val] : columnDescriptorMap_) {
4887  if (val->columnType.get_compression() == kENCODING_DICT &&
4888  val->columnType.get_comp_param() == dictId &&
4889  !(val->tableId == td->tableId && val->columnId == cd->columnId)) {
4890  if (val->tableId < lowest_table) {
4891  lowest_table = val->tableId;
4892  lowest_column = val->columnId;
4893  lowest_column_name = val->columnName;
4894  }
4895  if (val->columnId < lowest_column) {
4896  lowest_column = val->columnId;
4897  lowest_column_name = val->columnName;
4898  }
4899  }
4900  }
4901  if (lowest_table != td->tableId || lowest_column != cd->columnId) {
4902  // we are referencing a different tables dictionary
4903  auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
4904  CHECK(lowest_td);
4905  std::string txt = "SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
4906  ") REFERENCES " + lowest_td->tableName + "(" +
4907  quoteIfRequired(lowest_column_name) + ")";
4908 
4909  additional_info.emplace_back(txt);
4910  shared_dict_column_names.insert(cd->columnName);
4911  }
4912  }
4913  }
4914  }
4915 }
4916 
4917 int32_t Catalog::createCustomExpression(
4918  std::unique_ptr<CustomExpression> custom_expression) {
4919  cat_write_lock write_lock(this);
4920  cat_sqlite_lock sqlite_lock(getObjForLock());
4921  sqliteConnector_.query("BEGIN TRANSACTION");
4922  int32_t custom_expression_id{-1};
4923  try {
4924  auto data_source_type_str =
4925  CustomExpression::dataSourceTypeToString(custom_expression->data_source_type);
4926  auto data_source_id_str = std::to_string(custom_expression->data_source_id);
4927  std::string custom_expr_select_query{
4928  "SELECT id FROM omnisci_custom_expressions WHERE name = ? and data_source_type = "
4929  "? and data_source_id = ? and is_deleted = ?"};
4930  std::vector<std::string> custom_expr_select_params{custom_expression->name,
4931  data_source_type_str,
4932  data_source_id_str,
4933  std::to_string(false)};
4934  sqliteConnector_.query_with_text_params(custom_expr_select_query,
4935  custom_expr_select_params);
4936  if (sqliteConnector_.getNumRows() > 0) {
4937  throw std::runtime_error{
4938  "A custom expression with the given "
4939  "name and data source already exists."};
4940  }
4941  sqliteConnector_.query_with_text_params(
4942  "INSERT INTO omnisci_custom_expressions(name, expression_json, "
4943  "data_source_type, data_source_id, is_deleted) VALUES (?,?,?,?,?)",
4944  std::vector<std::string>{custom_expression->name,
4945  custom_expression->expression_json,
4946  data_source_type_str,
4947  data_source_id_str,
4948  std::to_string(false)});
4949  sqliteConnector_.query_with_text_params(custom_expr_select_query,
4950  custom_expr_select_params);
4951  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
4952  custom_expression->id = sqliteConnector_.getData<int32_t>(0, 0);
4953  custom_expression_id = custom_expression->id;
4954  CHECK(custom_expr_map_by_id_.find(custom_expression->id) ==
4955  custom_expr_map_by_id_.end());
4956  custom_expr_map_by_id_[custom_expression->id] = std::move(custom_expression);
4957  } catch (std::exception& e) {
4958  sqliteConnector_.query("ROLLBACK TRANSACTION");
4959  throw;
4960  }
4961  sqliteConnector_.query("END TRANSACTION");
4962  CHECK_GT(custom_expression_id, 0);
4963  return custom_expression_id;
4964 }
4965 
4966 const CustomExpression* Catalog::getCustomExpression(int32_t custom_expression_id) const {
4967  cat_read_lock read_lock(this);
4968  auto it = custom_expr_map_by_id_.find(custom_expression_id);
4969  if (it != custom_expr_map_by_id_.end()) {
4970  return it->second.get();
4971  }
4972  return nullptr;
4973 }
4974 
4975 const std::unique_ptr<const CustomExpression> Catalog::getCustomExpressionFromStorage(
4976  int32_t custom_expression_id) {
4977  cat_sqlite_lock sqlite_lock(getObjForLock());
4978  sqliteConnector_.query_with_text_params(
4979  "SELECT id, name, expression_json, data_source_type, data_source_id, "
4980  "is_deleted FROM omnisci_custom_expressions WHERE id = ?",
4981  std::vector<std::string>{to_string(custom_expression_id)});
4982  if (sqliteConnector_.getNumRows() > 0) {
4983  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
4984  return getCustomExpressionFromConnector(0);
4985  }
4986  return nullptr;
4987 }
4988 
4989 std::vector<const CustomExpression*> Catalog::getCustomExpressionsForUser(
4990  const UserMetadata& user) const {
4991  std::vector<const CustomExpression*> all_custom_expressions;
4992  {
4993  // Get custom expression pointers separately in order to avoid holding the catalog
4994  // read lock while checking privileges (which may cause a deadlock).
4995  cat_read_lock read_lock(this);
4996  for (const auto& [id, custom_expression] : custom_expr_map_by_id_) {
4997  all_custom_expressions.emplace_back(custom_expression.get());
4998  }
4999  }
5000 
5001  std::vector<const CustomExpression*> filtered_custom_expressions;
5002  for (const auto custom_expression : all_custom_expressions) {
5003  CHECK(custom_expression->data_source_type == DataSourceType::TABLE);
5004  DBObject db_object{custom_expression->data_source_id, TableDBObjectType};
5005  db_object.loadKey(*this);
5006  db_object.setPrivileges(AccessPrivileges::SELECT_FROM_TABLE);
5007  if (SysCatalog::instance().checkPrivileges(user, {db_object})) {
5008  filtered_custom_expressions.emplace_back(custom_expression);
5009  }
5010  }
5011  return filtered_custom_expressions;
5012 }
5013 
5014 void Catalog::updateCustomExpression(int32_t custom_expression_id,
5015  const std::string& expression_json) {
5016  cat_write_lock write_lock(this);
5017  cat_sqlite_lock sqlite_lock(getObjForLock());
5018  auto it = custom_expr_map_by_id_.find(custom_expression_id);
5019  if (it == custom_expr_map_by_id_.end() || it->second->is_deleted) {
5020  throw std::runtime_error{"Custom expression with id \"" +
5021  std::to_string(custom_expression_id) + "\" does not exist."};
5022  }
5023  auto old_expression_json = it->second->expression_json;
5024  sqliteConnector_.query("BEGIN TRANSACTION");
5025  try {
5026  sqliteConnector_.query_with_text_params(
5027  "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
5028  std::vector<std::string>{std::to_string(custom_expression_id)});
5029  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
5030  sqliteConnector_.query_with_text_params(
5031  "UPDATE omnisci_custom_expressions SET expression_json = ? WHERE id = ?",
5032  std::vector<std::string>{expression_json, std::to_string(custom_expression_id)});
5033  it->second->expression_json = expression_json;
5034  } catch (std::exception& e) {
5035  sqliteConnector_.query("ROLLBACK TRANSACTION");
5036  it->second->expression_json = old_expression_json;
5037  throw;
5038  }
5039  sqliteConnector_.query("END TRANSACTION");
5040 }
5041 
5042 void Catalog::deleteCustomExpressions(const std::vector<int32_t>& custom_expression_ids,
5043  bool do_soft_delete) {
5044  cat_write_lock write_lock(this);
5045  cat_sqlite_lock sqlite_lock(getObjForLock());
5046 
5047  std::vector<int32_t> invalid_ids;
5048  for (const auto id : custom_expression_ids) {
5049  if (custom_expr_map_by_id_.find(id) == custom_expr_map_by_id_.end()) {
5050  invalid_ids.emplace_back(id);
5051  }
5052  }
5053  if (!invalid_ids.empty()) {
5054  throw std::runtime_error{"Custom expressions with ids: " + join(invalid_ids, ",") +
5055  " do not exist."};
5056  }
5057  sqliteConnector_.query("BEGIN TRANSACTION");
5058  try {
5059  for (const auto id : custom_expression_ids) {
5060  sqliteConnector_.query_with_text_params(
5061  "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
5062  std::vector<std::string>{std::to_string(id)});
5063  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
5064  if (do_soft_delete) {
5065  sqliteConnector_.query_with_text_params(
5066  "UPDATE omnisci_custom_expressions SET is_deleted = ? WHERE id = ?",
5067  std::vector<std::string>{std::to_string(true), std::to_string(id)});
5068  } else {
5069  sqliteConnector_.query_with_text_params(
5070  "DELETE FROM omnisci_custom_expressions WHERE id = ?",
5071  std::vector<std::string>{std::to_string(id)});
5072  }
5073  }
5074 
5075  for (const auto id : custom_expression_ids) {
5076  if (do_soft_delete) {
5077  au