OmniSciDB  16c4e035a1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Catalog/Catalog.h"
26 
27 #include <algorithm>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/range/adaptor/map.hpp>
31 #include <boost/version.hpp>
32 #include <cassert>
33 #include <cerrno>
34 #include <cstdio>
35 #include <cstring>
36 #include <exception>
37 #include <fstream>
38 #include <list>
39 #include <memory>
40 #include <random>
41 #include <regex>
42 #include <sstream>
43 
44 #if BOOST_VERSION >= 106600
45 #include <boost/uuid/detail/sha1.hpp>
46 #else
47 #include <boost/uuid/sha1.hpp>
48 #endif
49 #include <rapidjson/document.h>
50 #include <rapidjson/istreamwrapper.h>
51 #include <rapidjson/ostreamwrapper.h>
52 #include <rapidjson/writer.h>
53 
54 #include "Catalog/SysCatalog.h"
55 
56 #include "QueryEngine/Execute.h"
58 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
68 #include "Parser/ParserNode.h"
69 #include "QueryEngine/Execute.h"
71 #include "RefreshTimeCalculator.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/File.h"
74 #include "Shared/StringTransform.h"
75 #include "Shared/measure.h"
76 #include "Shared/misc.h"
78 
79 #include "MapDRelease.h"
80 #include "RWLocks.h"
82 
83 using Chunk_NS::Chunk;
86 using std::list;
87 using std::map;
88 using std::pair;
89 using std::runtime_error;
90 using std::string;
91 using std::vector;
92 
94 bool g_enable_fsi{false};
95 bool g_enable_s3_fsi{false};
96 extern bool g_cache_string_hash;
97 extern bool g_enable_system_tables;
98 
99 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
100 // under unit testing.
102 
103 namespace Catalog_Namespace {
104 
105 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
107  1073741824; // 2^30, give room for over a billion non-temp tables
109  1073741824; // 2^30, give room for over a billion non-temp dictionaries
110 
111 const std::string Catalog::physicalTableNameTag_("_shard_#");
112 
113 thread_local bool Catalog::thread_holds_read_lock = false;
114 
119 
120 // migration will be done as two step process this release
121 // will create and use new table
122 // next release will remove old table, doing this to have fall back path
123 // incase of migration failure
126  sqliteConnector_.query("BEGIN TRANSACTION");
127  try {
129  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
130  if (sqliteConnector_.getNumRows() != 0) {
131  // already done
132  sqliteConnector_.query("END TRANSACTION");
133  return;
134  }
136  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
137  "userid integer references mapd_users, state text, image_hash text, update_time "
138  "timestamp, "
139  "metadata text, UNIQUE(userid, name) )");
140  // now copy content from old table to new table
142  "insert into mapd_dashboards (id, name , "
143  "userid, state, image_hash, update_time , "
144  "metadata) "
145  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
146  "view_metadata "
147  "from mapd_frontend_views");
148  } catch (const std::exception& e) {
149  sqliteConnector_.query("ROLLBACK TRANSACTION");
150  throw;
151  }
152  sqliteConnector_.query("END TRANSACTION");
153 }
154 
155 namespace {
156 
157 inline auto table_json_filepath(const std::string& base_path,
158  const std::string& db_name) {
159  return boost::filesystem::path(base_path + "/mapd_catalogs/" + db_name +
160  "_temp_tables.json");
161 }
162 
163 } // namespace
164 
165 Catalog::Catalog(const string& basePath,
166  const DBMetadata& curDB,
167  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
168  const std::vector<LeafHostInfo>& string_dict_hosts,
169  std::shared_ptr<Calcite> calcite,
170  bool is_new_db)
171  : basePath_(basePath)
172  , sqliteConnector_(curDB.dbName, basePath + "/mapd_catalogs/")
173  , currentDB_(curDB)
174  , dataMgr_(dataMgr)
175  , string_dict_hosts_(string_dict_hosts)
176  , calciteMgr_(calcite)
177  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
178  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
179  , sqliteMutex_()
180  , sharedMutex_()
181  , thread_holding_sqlite_lock()
182  , thread_holding_write_lock() {
183  if (!is_new_db) {
184  CheckAndExecuteMigrations();
185  }
186  buildMaps();
187  if (!is_new_db) {
188  CheckAndExecuteMigrationsPostBuildMaps();
189  }
191  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
192  }
193  conditionallyInitializeSystemObjects();
194  // once all initialized use real object
195  initialized_ = true;
196 }
197 
199 
202  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
203  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
204  tableDescIt != tableDescriptorMap_.end();
205  ++tableDescIt) {
206  tableDescIt->second->fragmenter = nullptr;
207  delete tableDescIt->second;
208  }
209 
210  // TableDescriptorMapById points to the same descriptors. No need to delete
211 
212  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
213  columnDescIt != columnDescriptorMap_.end();
214  ++columnDescIt) {
215  delete columnDescIt->second;
216  }
217 
218  // ColumnDescriptorMapById points to the same descriptors. No need to delete
219 
221  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
222  }
223 }
224 
226  if (initialized_) {
227  return this;
228  } else {
229  return SysCatalog::instance().getDummyCatalog().get();
230  }
231 }
232 
235  sqliteConnector_.query("BEGIN TRANSACTION");
236  try {
237  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
238  std::vector<std::string> cols;
239  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
240  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
241  }
242  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
243  cols.end()) {
244  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
246  sqliteConnector_.query(queryString);
247  }
248  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
249  cols.end()) {
250  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
251  std::to_string(0));
252  sqliteConnector_.query(queryString);
253  }
254  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
255  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
256  std::to_string(-1));
257  sqliteConnector_.query(queryString);
258  }
259  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
260  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
261  std::to_string(0));
262  sqliteConnector_.query(queryString);
263  }
264  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
265  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
266  sqliteConnector_.query(queryString);
267  }
268  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
269  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
271  sqliteConnector_.query(queryString);
272  }
273  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
274  cols.end()) {
276  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
277  }
278  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
279  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
280  sqliteConnector_.query(queryString);
281  }
282  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
283  cols.end()) {
284  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
285  std::to_string(-1));
286  sqliteConnector_.query(queryString);
287  }
288  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
289  cols.end()) {
290  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
291  sqliteConnector_.query(queryString);
292  }
293  } catch (std::exception& e) {
294  sqliteConnector_.query("ROLLBACK TRANSACTION");
295  throw;
296  }
297  sqliteConnector_.query("END TRANSACTION");
298 }
299 
302  sqliteConnector_.query("BEGIN TRANSACTION");
303  try {
305  "select name from sqlite_master WHERE type='table' AND "
306  "name='mapd_version_history'");
307  if (sqliteConnector_.getNumRows() == 0) {
309  "CREATE TABLE mapd_version_history(version integer, migration_history text "
310  "unique)");
311  } else {
313  "select * from mapd_version_history where migration_history = "
314  "'notnull_fixlen_arrays'");
315  if (sqliteConnector_.getNumRows() != 0) {
316  // legacy fixlen arrays had migrated
317  // no need for further execution
318  sqliteConnector_.query("END TRANSACTION");
319  return;
320  }
321  }
322  // Insert check for migration
324  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
325  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
326  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
327  // Upating all fixlen array columns
328  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
329  std::to_string(kARRAY) + " AND size>0;");
330  sqliteConnector_.query(queryString);
331  } catch (std::exception& e) {
332  sqliteConnector_.query("ROLLBACK TRANSACTION");
333  throw;
334  }
335  sqliteConnector_.query("END TRANSACTION");
336 }
337 
340  sqliteConnector_.query("BEGIN TRANSACTION");
341  try {
343  "select name from sqlite_master WHERE type='table' AND "
344  "name='mapd_version_history'");
345  if (sqliteConnector_.getNumRows() == 0) {
347  "CREATE TABLE mapd_version_history(version integer, migration_history text "
348  "unique)");
349  } else {
351  "select * from mapd_version_history where migration_history = "
352  "'notnull_geo_columns'");
353  if (sqliteConnector_.getNumRows() != 0) {
354  // legacy geo columns had migrated
355  // no need for further execution
356  sqliteConnector_.query("END TRANSACTION");
357  return;
358  }
359  }
360  // Insert check for migration
362  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
363  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
364  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
365  // Upating all geo columns
366  string queryString(
367  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
368  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
369  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
370  sqliteConnector_.query(queryString);
371  } catch (std::exception& e) {
372  sqliteConnector_.query("ROLLBACK TRANSACTION");
373  throw;
374  }
375  sqliteConnector_.query("END TRANSACTION");
376 }
377 
380  sqliteConnector_.query("BEGIN TRANSACTION");
381  try {
382  // check table still exists
384  "SELECT name FROM sqlite_master WHERE type='table' AND "
385  "name='mapd_frontend_views'");
386  if (sqliteConnector_.getNumRows() == 0) {
387  // table does not exists
388  // no need to migrate
389  sqliteConnector_.query("END TRANSACTION");
390  return;
391  }
392  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
393  std::vector<std::string> cols;
394  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
395  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
396  }
397  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
398  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
399  }
400  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
401  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
402  }
403  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
404  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
405  }
406  } catch (std::exception& e) {
407  sqliteConnector_.query("ROLLBACK TRANSACTION");
408  throw;
409  }
410  sqliteConnector_.query("END TRANSACTION");
411 }
412 
415  sqliteConnector_.query("BEGIN TRANSACTION");
416  try {
418  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
419  "integer references mapd_users, "
420  "link text unique, view_state text, update_time timestamp, view_metadata text)");
421  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
422  std::vector<std::string> cols;
423  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
424  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
425  }
426  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
427  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
428  }
429  } catch (const std::exception& e) {
430  sqliteConnector_.query("ROLLBACK TRANSACTION");
431  throw;
432  }
433  sqliteConnector_.query("END TRANSACTION");
434 }
435 
438  sqliteConnector_.query("BEGIN TRANSACTION");
439  try {
440  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
441  // check table still exists
443  "SELECT name FROM sqlite_master WHERE type='table' AND "
444  "name='mapd_frontend_views'");
445  if (sqliteConnector_.getNumRows() == 0) {
446  // table does not exists
447  // no need to migrate
448  sqliteConnector_.query("END TRANSACTION");
449  return;
450  }
452  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
453  } catch (const std::exception& e) {
454  sqliteConnector_.query("ROLLBACK TRANSACTION");
455  throw;
456  }
457  sqliteConnector_.query("END TRANSACTION");
458 }
459 
460 // introduce DB version into the tables table
461 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
462 // old value
463 
466  if (currentDB_.dbName.length() == 0) {
467  // updateDictionaryNames dbName length is zero nothing to do here
468  return;
469  }
470  sqliteConnector_.query("BEGIN TRANSACTION");
471  try {
472  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
473  std::vector<std::string> cols;
474  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
475  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
476  }
477  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
478  LOG(INFO) << "Updating mapd_tables updatePageSize";
479  // No version number
480  // need to update the defaul tpagesize to old correct value
481  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
482  // need to add new version info
483  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
485  sqliteConnector_.query(queryString);
486  }
487  } catch (std::exception& e) {
488  sqliteConnector_.query("ROLLBACK TRANSACTION");
489  throw;
490  }
491  sqliteConnector_.query("END TRANSACTION");
492 }
493 
496  sqliteConnector_.query("BEGIN TRANSACTION");
497  try {
498  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
499  std::vector<std::string> cols;
500  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
501  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
502  }
503  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
504  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
505  // need to add new version info
506  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
508  sqliteConnector_.query(queryString);
509  // need to add new column to table defintion to indicate deleted column, column used
510  // as bitmap for deleted rows.
512  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
513  }
514  } catch (std::exception& e) {
515  sqliteConnector_.query("ROLLBACK TRANSACTION");
516  throw;
517  }
518  sqliteConnector_.query("END TRANSACTION");
519 }
520 
523  sqliteConnector_.query("BEGIN TRANSACTION");
524  try {
525  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
526  std::vector<std::string> cols;
527  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
528  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
529  }
530  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
531  LOG(INFO) << "Adding support for default values to mapd_columns";
532  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
533  }
534  } catch (std::exception& e) {
535  LOG(ERROR) << "Failed to make metadata update for default values` support";
536  sqliteConnector_.query("ROLLBACK TRANSACTION");
537  throw;
538  }
539  sqliteConnector_.query("END TRANSACTION");
540 }
541 
542 // introduce DB version into the dictionary tables
543 // if the DB does not have a version rename all dictionary tables
544 
547  if (currentDB_.dbName.length() == 0) {
548  // updateDictionaryNames dbName length is zero nothing to do here
549  return;
550  }
551  sqliteConnector_.query("BEGIN TRANSACTION");
552  try {
553  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
554  std::vector<std::string> cols;
555  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
556  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
557  }
558  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
559  // No version number
560  // need to rename dictionaries
561  string dictQuery("SELECT dictid, name from mapd_dictionaries");
562  sqliteConnector_.query(dictQuery);
563  size_t numRows = sqliteConnector_.getNumRows();
564  for (size_t r = 0; r < numRows; ++r) {
565  int dictId = sqliteConnector_.getData<int>(r, 0);
566  std::string dictName = sqliteConnector_.getData<string>(r, 1);
567 
568  std::string oldName =
569  g_base_path + "/mapd_data/" + currentDB_.dbName + "_" + dictName;
570  std::string newName = g_base_path + "/mapd_data/DB_" +
571  std::to_string(currentDB_.dbId) + "_DICT_" +
572  std::to_string(dictId);
573 
574  int result = rename(oldName.c_str(), newName.c_str());
575 
576  if (result == 0) {
577  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
578  << newName;
579  } else {
580  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
581  << newName + " dbname '" << currentDB_.dbName << "' error code "
582  << std::to_string(result);
583  }
584  }
585  // need to add new version info
586  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
588  sqliteConnector_.query(queryString);
589  }
590  } catch (std::exception& e) {
591  sqliteConnector_.query("ROLLBACK TRANSACTION");
592  throw;
593  }
594  sqliteConnector_.query("END TRANSACTION");
595 }
596 
599  sqliteConnector_.query("BEGIN TRANSACTION");
600  try {
602  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
603  "logical_table_id integer, physical_table_id integer)");
604  } catch (const std::exception& e) {
605  sqliteConnector_.query("ROLLBACK TRANSACTION");
606  throw;
607  }
608  sqliteConnector_.query("END TRANSACTION");
609 }
610 
611 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
612  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
613  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
614  */
615 
617  sqliteConnector_.query("BEGIN TRANSACTION");
618  try {
619  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
620  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
621  const auto physicalTables = physicalTableIt->second;
622  CHECK(!physicalTables.empty());
623  for (size_t i = 0; i < physicalTables.size(); i++) {
624  int32_t physical_tb_id = physicalTables[i];
626  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
627  "physical_table_id) VALUES (?1, ?2)",
628  std::vector<std::string>{std::to_string(logical_tb_id),
629  std::to_string(physical_tb_id)});
630  }
631  }
632  } catch (std::exception& e) {
633  sqliteConnector_.query("ROLLBACK TRANSACTION");
634  throw;
635  }
636  sqliteConnector_.query("END TRANSACTION");
637 }
638 
641  sqliteConnector_.query("BEGIN TRANSACTION");
642  try {
643  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
644  std::vector<std::string> cols;
645  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
646  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
647  }
648  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
649  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
650  }
651  } catch (std::exception& e) {
652  sqliteConnector_.query("ROLLBACK TRANSACTION");
653  throw;
654  }
655  sqliteConnector_.query("END TRANSACTION");
656 }
657 
660  sqliteConnector_.query("BEGIN TRANSACTION");
661  try {
664  } catch (std::exception& e) {
665  sqliteConnector_.query("ROLLBACK TRANSACTION");
666  throw;
667  }
668  sqliteConnector_.query("END TRANSACTION");
669 }
670 
673  sqliteConnector_.query("BEGIN TRANSACTION");
674  try {
676  } catch (const std::exception& e) {
677  sqliteConnector_.query("ROLLBACK TRANSACTION");
678  throw;
679  }
680  sqliteConnector_.query("END TRANSACTION");
681 }
682 
683 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
684  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
685  "omnisci_foreign_servers(id integer primary key, name text unique, " +
686  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
687  "options text)";
688 }
689 
690 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
691  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
692  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
693  "options text, last_refresh_time integer, next_refresh_time integer, " +
694  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
695  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
696 }
697 
698 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
699  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
700  "omnisci_custom_expressions(id integer primary key, name text, " +
701  "expression_json text, data_source_type text, " +
702  "data_source_id integer, is_deleted boolean)";
703 }
704 
707  sqliteConnector_.query("BEGIN TRANSACTION");
708  std::vector<DBObject> objects;
709  try {
711  "SELECT name FROM sqlite_master WHERE type='table' AND "
712  "name='mapd_record_ownership_marker'");
713  // check if mapd catalog - marker exists
714  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
715  // already done
716  sqliteConnector_.query("END TRANSACTION");
717  return;
718  }
719  // check if different catalog - marker exists
720  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
721  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
722  // Check if migration is being performed on existing non mapd catalogs
723  // Older non mapd dbs will have table but no record in them
724  if (sqliteConnector_.getNumRows() != 0) {
725  // already done
726  sqliteConnector_.query("END TRANSACTION");
727  return;
728  }
729  }
730  // marker not exists - create one
731  else {
732  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
733  }
734 
735  DBMetadata db;
736  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
737  // place dbId as a refernce for migration being performed
739  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
740  std::vector<std::string>{std::to_string(db.dbOwner)});
741 
742  static const std::map<const DBObjectType, const AccessPrivileges>
743  object_level_all_privs_lookup{
749 
750  // grant owner all permissions on DB
751  DBObjectKey key;
752  key.dbId = currentDB_.dbId;
753  auto _key_place = [&key](auto type) {
754  key.permissionType = type;
755  return key;
756  };
757  for (auto& it : object_level_all_privs_lookup) {
758  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
759  objects.back().setName(currentDB_.dbName);
760  }
761 
762  {
763  // other users tables and views
764  string tableQuery(
765  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
766  sqliteConnector_.query(tableQuery);
767  size_t numRows = sqliteConnector_.getNumRows();
768  for (size_t r = 0; r < numRows; ++r) {
769  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
770  std::string tableName = sqliteConnector_.getData<string>(r, 1);
771  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
772  bool isview = sqliteConnector_.getData<bool>(r, 3);
773 
776  DBObjectKey key;
777  key.dbId = currentDB_.dbId;
778  key.objectId = tableid;
779  key.permissionType = type;
780 
781  DBObject obj(tableName, type);
782  obj.setObjectKey(key);
783  obj.setOwner(ownerid);
786 
787  objects.push_back(obj);
788  }
789  }
790 
791  {
792  // other users dashboards
793  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
794  sqliteConnector_.query(tableQuery);
795  size_t numRows = sqliteConnector_.getNumRows();
796  for (size_t r = 0; r < numRows; ++r) {
797  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
798  std::string dashName = sqliteConnector_.getData<string>(r, 1);
799  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
800 
802  DBObjectKey key;
803  key.dbId = currentDB_.dbId;
804  key.objectId = dashId;
805  key.permissionType = type;
806 
807  DBObject obj(dashName, type);
808  obj.setObjectKey(key);
809  obj.setOwner(ownerid);
811 
812  objects.push_back(obj);
813  }
814  }
815  } catch (const std::exception& e) {
816  sqliteConnector_.query("ROLLBACK TRANSACTION");
817  throw;
818  }
819  sqliteConnector_.query("END TRANSACTION");
820 
821  // now apply the objects to the syscat to track the permisisons
822  // moved outside transaction to avoid lock in sqlite
823  try {
825  } catch (const std::exception& e) {
826  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
827  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
828  e.what());
829  // will need to remove the mapd_record_ownership_marker table and retry
830  }
831 }
832 
837 }
838 
840  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
841  std::vector<std::string> dashboard_ids;
842  static const std::string migration_name{"dashboard_roles_migration"};
843  {
845  sqliteConnector_.query("BEGIN TRANSACTION");
846  try {
847  // migration_history should be present in all catalogs by now
848  // if not then would be created before this migration
850  "select * from mapd_version_history where migration_history = '" +
851  migration_name + "'");
852  if (sqliteConnector_.getNumRows() != 0) {
853  // no need for further execution
854  sqliteConnector_.query("END TRANSACTION");
855  return;
856  }
857  LOG(INFO) << "Performing dashboard internal roles Migration.";
858  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
859  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
862  sqliteConnector_.getData<string>(i, 0)))) {
863  // Successfully created roles during previous migration/crash
864  // No need to include them
865  continue;
866  }
867  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
868  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
869  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
870  }
871  } catch (const std::exception& e) {
872  sqliteConnector_.query("ROLLBACK TRANSACTION");
873  throw;
874  }
875  sqliteConnector_.query("END TRANSACTION");
876  }
877  // All current grantees with shared dashboards.
878  const auto active_grantees =
880 
881  try {
882  // NOTE(wamsi): Transactionally unsafe
883  for (auto dash : dashboards) {
884  createOrUpdateDashboardSystemRole(dash.second.second,
885  dash.second.first,
887  std::to_string(currentDB_.dbId), dash.first));
888  auto result = active_grantees.find(dash.first);
889  if (result != active_grantees.end()) {
892  dash.first)},
893  result->second);
894  }
895  }
897  // check if this has already been completed
899  "select * from mapd_version_history where migration_history = '" +
900  migration_name + "'");
901  if (sqliteConnector_.getNumRows() != 0) {
902  return;
903  }
905  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
906  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
907  } catch (const std::exception& e) {
908  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
909  << e.what();
910  throw;
911  }
912  LOG(INFO) << "Successfully created dashboard system roles during migration.";
913 }
914 
925  updatePageSize();
929  if (g_enable_fsi) {
931  }
934 }
935 
939 }
940 
941 namespace {
942 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
943  auto users = SysCatalog::instance().getAllUserMetadata();
944  std::map<int32_t, std::string> user_name_by_user_id;
945  for (const auto& user : users) {
946  user_name_by_user_id[user.userId] = user.userName;
947  }
948  return user_name_by_user_id;
949 }
950 
952  int32_t id,
953  const std::map<int32_t, std::string>& user_name_by_user_id) {
954  auto entry = user_name_by_user_id.find(id);
955  if (entry != user_name_by_user_id.end()) {
956  return entry->second;
957  }
958  // a user could be deleted and a dashboard still exist?
959  return "Unknown";
960 }
961 } // namespace
962 
964  // Get all user id to username mapping here in order to avoid making a call to
965  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
966  // catalog.
967  const auto user_name_by_user_id = get_user_id_to_user_name_map();
968 
971 
972  string dictQuery(
973  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
974  sqliteConnector_.query(dictQuery);
975  size_t numRows = sqliteConnector_.getNumRows();
976  for (size_t r = 0; r < numRows; ++r) {
977  int dictId = sqliteConnector_.getData<int>(r, 0);
978  std::string dictName = sqliteConnector_.getData<string>(r, 1);
979  int dictNBits = sqliteConnector_.getData<int>(r, 2);
980  bool is_shared = sqliteConnector_.getData<bool>(r, 3);
981  int refcount = sqliteConnector_.getData<int>(r, 4);
982  std::string fname = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
983  "_DICT_" + std::to_string(dictId);
984  DictRef dict_ref(currentDB_.dbId, dictId);
985  DictDescriptor* dd = new DictDescriptor(
986  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
987  dictDescriptorMapByRef_[dict_ref].reset(dd);
988  }
989 
990  string tableQuery(
991  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
992  "max_chunk_size, frag_page_size, "
993  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
994  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
995  "from mapd_tables");
996  sqliteConnector_.query(tableQuery);
997  numRows = sqliteConnector_.getNumRows();
998  for (size_t r = 0; r < numRows; ++r) {
999  TableDescriptor* td;
1000  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1001  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1002  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1003  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1004  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1005  "supported table option (table "
1006  << table_name << " [" << table_id << "] in database "
1007  << currentDB_.dbName << ").";
1008  }
1009 
1010  if (storage_type == StorageType::FOREIGN_TABLE) {
1011  td = new foreign_storage::ForeignTable();
1012  } else {
1013  td = new TableDescriptor();
1014  }
1015 
1016  td->storageType = storage_type;
1017  td->tableId = sqliteConnector_.getData<int>(r, 0);
1018  td->tableName = sqliteConnector_.getData<string>(r, 1);
1019  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1020  td->isView = sqliteConnector_.getData<bool>(r, 3);
1021  td->fragments = sqliteConnector_.getData<string>(r, 4);
1022  td->fragType =
1024  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1025  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1026  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1027  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1028  td->partitions = sqliteConnector_.getData<string>(r, 10);
1029  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1030  td->shard = sqliteConnector_.getData<int>(r, 12);
1031  td->nShards = sqliteConnector_.getData<int>(r, 13);
1032  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1033  td->userId = sqliteConnector_.getData<int>(r, 15);
1034  td->sortedColumnId =
1035  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1036  if (!td->isView) {
1037  td->fragmenter = nullptr;
1038  }
1039  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1040  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1041  td->hasDeletedCol = false;
1042 
1044  tableDescriptorMapById_[td->tableId] = td;
1045  }
1046 
1047  if (g_enable_fsi) {
1051  }
1052 
1053  string columnQuery(
1054  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1055  "is_notnull, compression, comp_param, "
1056  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1057  "default_value from "
1058  "mapd_columns ORDER BY tableid, "
1059  "columnid");
1060  sqliteConnector_.query(columnQuery);
1061  numRows = sqliteConnector_.getNumRows();
1062  int32_t skip_physical_cols = 0;
1063  for (size_t r = 0; r < numRows; ++r) {
1064  ColumnDescriptor* cd = new ColumnDescriptor();
1065  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1066  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1067  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1071  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1075  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1076  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1077  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1078  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1079  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1080  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1081  if (sqliteConnector_.isNull(r, 16)) {
1082  cd->default_value = std::nullopt;
1083  } else {
1084  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1085  }
1086  cd->isGeoPhyCol = skip_physical_cols > 0;
1087  ColumnKey columnKey(cd->tableId, to_upper(cd->columnName));
1088  columnDescriptorMap_[columnKey] = cd;
1089  ColumnIdKey columnIdKey(cd->tableId, cd->columnId);
1090  columnDescriptorMapById_[columnIdKey] = cd;
1091 
1092  if (skip_physical_cols <= 0) {
1093  skip_physical_cols = cd->columnType.get_physical_cols();
1094  }
1095 
1096  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1097  CHECK(td_itr != tableDescriptorMapById_.end());
1098 
1099  if (cd->isDeletedCol) {
1100  td_itr->second->hasDeletedCol = true;
1101  setDeletedColumnUnlocked(td_itr->second, cd);
1102  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1103  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1104  }
1105  }
1106  // sort columnIdBySpi_ based on columnId
1107  for (auto& tit : tableDescriptorMapById_) {
1108  std::sort(tit.second->columnIdBySpi_.begin(),
1109  tit.second->columnIdBySpi_.end(),
1110  [](const size_t a, const size_t b) -> bool { return a < b; });
1111  }
1112 
1113  string viewQuery("SELECT tableid, sql FROM mapd_views");
1114  sqliteConnector_.query(viewQuery);
1115  numRows = sqliteConnector_.getNumRows();
1116  for (size_t r = 0; r < numRows; ++r) {
1117  int32_t tableId = sqliteConnector_.getData<int>(r, 0);
1118  TableDescriptor* td = tableDescriptorMapById_[tableId];
1119  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1120  td->fragmenter = nullptr;
1121  }
1122 
1123  string frontendViewQuery(
1124  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1125  "userid, "
1126  "metadata "
1127  "FROM mapd_dashboards");
1128  sqliteConnector_.query(frontendViewQuery);
1129  numRows = sqliteConnector_.getNumRows();
1130  for (size_t r = 0; r < numRows; ++r) {
1131  std::shared_ptr<DashboardDescriptor> vd = std::make_shared<DashboardDescriptor>();
1132  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1133  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1134  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1135  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1136  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1137  vd->userId = sqliteConnector_.getData<int>(r, 5);
1138  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1139  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1140  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1142  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1143  }
1144 
1145  string linkQuery(
1146  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1147  "update_time), view_metadata "
1148  "FROM mapd_links");
1149  sqliteConnector_.query(linkQuery);
1150  numRows = sqliteConnector_.getNumRows();
1151  for (size_t r = 0; r < numRows; ++r) {
1152  LinkDescriptor* ld = new LinkDescriptor();
1153  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1154  ld->userId = sqliteConnector_.getData<int>(r, 1);
1155  ld->link = sqliteConnector_.getData<string>(r, 2);
1156  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1157  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1158  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1160  linkDescriptorMapById_[ld->linkId] = ld;
1161  }
1162 
1163  /* rebuild map linking logical tables to corresponding physical ones */
1164  string logicalToPhysicalTableMapQuery(
1165  "SELECT logical_table_id, physical_table_id "
1166  "FROM mapd_logical_to_physical");
1167  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1168  numRows = sqliteConnector_.getNumRows();
1169  for (size_t r = 0; r < numRows; ++r) {
1170  int32_t logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1171  int32_t physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1172  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1173  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1174  /* add new entity to the map logicalToPhysicalTableMapById_ */
1175  std::vector<int32_t> physicalTables;
1176  physicalTables.push_back(physical_tb_id);
1177  const auto it_ok =
1178  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1179  CHECK(it_ok.second);
1180  } else {
1181  /* update map logicalToPhysicalTableMapById_ */
1182  physicalTableIt->second.push_back(physical_tb_id);
1183  }
1184  }
1185 
1187 }
1188 
1191  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1192  "is_deleted "
1193  "FROM omnisci_custom_expressions");
1194  auto num_rows = sqliteConnector_.getNumRows();
1195  for (size_t row = 0; row < num_rows; row++) {
1196  auto custom_expr = getCustomExpressionFromConnector(row);
1197  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1198  }
1199 }
1200 
1201 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1202  auto id = sqliteConnector_.getData<int>(row, 0);
1203  auto name = sqliteConnector_.getData<string>(row, 1);
1204  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1205  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1206  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1207  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1208  return std::make_unique<CustomExpression>(
1209  id,
1210  name,
1211  expression_json,
1212  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1213  data_source_id,
1214  is_deleted);
1215 }
1216 
1218  const list<ColumnDescriptor>& columns,
1219  const list<DictDescriptor>& dicts) {
1220  cat_write_lock write_lock(this);
1221  TableDescriptor* new_td;
1222 
1223  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1224  if (foreign_table) {
1225  auto new_foreign_table = new foreign_storage::ForeignTable();
1226  *new_foreign_table = *foreign_table;
1227  new_td = new_foreign_table;
1228  } else {
1229  new_td = new TableDescriptor();
1230  *new_td = *td;
1231  }
1232 
1233  new_td->mutex_ = std::make_shared<std::mutex>();
1234  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1235  tableDescriptorMapById_[td->tableId] = new_td;
1236  for (auto cd : columns) {
1237  ColumnDescriptor* new_cd = new ColumnDescriptor();
1238  *new_cd = cd;
1239  ColumnKey columnKey(new_cd->tableId, to_upper(new_cd->columnName));
1240  columnDescriptorMap_[columnKey] = new_cd;
1241  ColumnIdKey columnIdKey(new_cd->tableId, new_cd->columnId);
1242  columnDescriptorMapById_[columnIdKey] = new_cd;
1243 
1244  // Add deleted column to the map
1245  if (cd.isDeletedCol) {
1246  CHECK(new_td->hasDeletedCol);
1247  setDeletedColumnUnlocked(new_td, new_cd);
1248  }
1249  }
1250 
1251  std::sort(new_td->columnIdBySpi_.begin(),
1252  new_td->columnIdBySpi_.end(),
1253  [](const size_t a, const size_t b) -> bool { return a < b; });
1254 
1255  std::unique_ptr<StringDictionaryClient> client;
1256  DictRef dict_ref(currentDB_.dbId, -1);
1257  if (!string_dict_hosts_.empty()) {
1258  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1259  }
1260  for (auto dd : dicts) {
1261  if (!dd.dictRef.dictId) {
1262  // Dummy entry created for a shard of a logical table, nothing to do.
1263  continue;
1264  }
1265  dict_ref.dictId = dd.dictRef.dictId;
1266  if (client) {
1267  client->create(dict_ref, dd.dictIsTemp);
1268  }
1269  DictDescriptor* new_dd = new DictDescriptor(dd);
1270  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1271  if (!dd.dictIsTemp) {
1272  boost::filesystem::create_directory(new_dd->dictFolderPath);
1273  }
1274  }
1275 }
1276 
1277 void Catalog::removeTableFromMap(const string& tableName,
1278  const int tableId,
1279  const bool is_on_error) {
1280  cat_write_lock write_lock(this);
1281  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1282  if (tableDescIt == tableDescriptorMapById_.end()) {
1283  throw runtime_error("Table " + tableName + " does not exist.");
1284  }
1285 
1286  TableDescriptor* td = tableDescIt->second;
1287 
1288  if (td->hasDeletedCol) {
1289  const auto ret = deletedColumnPerTable_.erase(td);
1290  CHECK_EQ(ret, size_t(1));
1291  }
1292 
1293  tableDescriptorMapById_.erase(tableDescIt);
1294  tableDescriptorMap_.erase(to_upper(tableName));
1295  td->fragmenter = nullptr;
1296 
1298  delete td;
1299 
1300  std::unique_ptr<StringDictionaryClient> client;
1301  if (SysCatalog::instance().isAggregator()) {
1302  CHECK(!string_dict_hosts_.empty());
1303  DictRef dict_ref(currentDB_.dbId, -1);
1304  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1305  }
1306 
1307  // delete all column descriptors for the table
1308  // no more link columnIds to sequential indexes!
1309  for (auto cit = columnDescriptorMapById_.begin();
1310  cit != columnDescriptorMapById_.end();) {
1311  if (tableId != std::get<0>(cit->first)) {
1312  ++cit;
1313  } else {
1314  int i = std::get<1>(cit++->first);
1315  ColumnIdKey cidKey(tableId, i);
1316  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1317  ColumnDescriptor* cd = colDescIt->second;
1318  columnDescriptorMapById_.erase(colDescIt);
1319  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1320  columnDescriptorMap_.erase(cnameKey);
1321  const int dictId = cd->columnType.get_comp_param();
1322  // Dummy dictionaries created for a shard of a logical table have the id set to
1323  // zero.
1324  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1325  INJECT_TIMER(removingDicts);
1326  DictRef dict_ref(currentDB_.dbId, dictId);
1327  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1328  // If we're removing this table due to an error, it is possible that the string
1329  // dictionary reference was never populated. Don't crash, just continue cleaning
1330  // up the TableDescriptor and ColumnDescriptors
1331  if (!is_on_error) {
1332  CHECK(dictIt != dictDescriptorMapByRef_.end());
1333  } else {
1334  if (dictIt == dictDescriptorMapByRef_.end()) {
1335  continue;
1336  }
1337  }
1338  const auto& dd = dictIt->second;
1339  CHECK_GE(dd->refcount, 1);
1340  --dd->refcount;
1341  if (!dd->refcount) {
1342  dd->stringDict.reset();
1343  if (!isTemp) {
1344  File_Namespace::renameForDelete(dd->dictFolderPath);
1345  }
1346  if (client) {
1347  client->drop(dict_ref);
1348  }
1349  dictDescriptorMapByRef_.erase(dictIt);
1350  }
1351  }
1352 
1353  delete cd;
1354  }
1355  }
1356 }
1357 
1359  cat_write_lock write_lock(this);
1361 }
1362 
1364  cat_write_lock write_lock(this);
1366  std::make_shared<DashboardDescriptor>(vd);
1367 }
1368 
1369 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1370  const int& user_id) {
1371  std::vector<DBObject> objects;
1372  DBObjectKey key;
1373  key.dbId = currentDB_.dbId;
1374  auto _key_place = [&key](auto type, auto id) {
1375  key.permissionType = type;
1376  key.objectId = id;
1377  return key;
1378  };
1379  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1380  auto td = getMetadataForTable(object_name, false);
1381  if (!td) {
1382  // Parsed object source is not present in current database
1383  // LOG the info and ignore
1384  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1385  << " no longer exists in current DB.";
1386  continue;
1387  }
1388  // Dashboard source can be Table or View
1389  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1390  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1392  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1393  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1394  objects.back().setName(td->tableName);
1395  }
1396  return objects;
1397 }
1398 
1399 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1400  const int32_t& user_id,
1401  const std::string& dash_role_name) {
1402  auto objects = parseDashboardObjects(view_meta, user_id);
1403  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1404  if (!rl) {
1405  // Dashboard role does not exist
1406  // create role and grant privileges
1407  // NOTE(wamsi): Transactionally unsafe
1409  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1410  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1411  } else {
1412  // Dashboard system role already exists
1413  // Add/remove privileges on objects
1414  std::set<DBObjectKey> revoke_keys;
1415  auto ex_objects = rl->getDbObjects(true);
1416  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1417  if (key.permissionType != TableDBObjectType &&
1418  key.permissionType != ViewDBObjectType) {
1419  continue;
1420  }
1421  bool found = false;
1422  for (auto obj : objects) {
1423  found = key == obj.getObjectKey() ? true : false;
1424  if (found) {
1425  break;
1426  }
1427  }
1428  if (!found) {
1429  revoke_keys.insert(key);
1430  }
1431  }
1432  for (auto& key : revoke_keys) {
1433  // revoke privs on object since the object is no
1434  // longer used by the dashboard as source
1435  // NOTE(wamsi): Transactionally unsafe
1437  dash_role_name, *rl->findDbObject(key, true), *this);
1438  }
1439  // Update privileges on remaining objects
1440  // NOTE(wamsi): Transactionally unsafe
1441  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1442  }
1443 }
1444 
1446  cat_write_lock write_lock(this);
1447  LinkDescriptor* new_ld = new LinkDescriptor();
1448  *new_ld = ld;
1450  linkDescriptorMapById_[ld.linkId] = new_ld;
1451 }
1452 
1454  auto time_ms = measure<>::execution([&]() {
1455  // instanciate table fragmenter upon first use
1456  // assume only insert order fragmenter is supported
1458  vector<Chunk> chunkVec;
1459  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1460  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1461  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1462  if (td->sortedColumnId > 0) {
1463  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1464  chunkVec,
1465  dataMgr_.get(),
1466  const_cast<Catalog*>(this),
1467  td->tableId,
1468  td->shard,
1469  td->maxFragRows,
1470  td->maxChunkSize,
1471  td->fragPageSize,
1472  td->maxRows,
1473  td->persistenceLevel);
1474  } else {
1475  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1476  chunkVec,
1477  dataMgr_.get(),
1478  const_cast<Catalog*>(this),
1479  td->tableId,
1480  td->shard,
1481  td->maxFragRows,
1482  td->maxChunkSize,
1483  td->fragPageSize,
1484  td->maxRows,
1485  td->persistenceLevel,
1486  !td->storageType.empty());
1487  }
1488  });
1489  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1490  << time_ms << "ms";
1491 }
1492 
1494  const std::string& tableName) const {
1495  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1496  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1497  return nullptr;
1498  }
1499  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1500 }
1501 
1503  const std::string& tableName) const {
1504  cat_read_lock read_lock(this);
1505  return getForeignTableUnlocked(tableName);
1506 }
1507 
1508 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1509  const bool populateFragmenter) const {
1510  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1511  // pure metadata calls
1512  cat_read_lock read_lock(this);
1513  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1514  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1515  return nullptr;
1516  }
1517  TableDescriptor* td = tableDescIt->second;
1518  CHECK(td);
1519  read_lock.unlock();
1520  if (populateFragmenter) {
1521  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1522  if (td->fragmenter == nullptr && !td->isView) {
1524  }
1525  }
1526  return td; // returns pointer to table descriptor
1527 }
1528 
1530  bool populateFragmenter) const {
1531  cat_read_lock read_lock(this);
1532  auto td = getMutableMetadataForTableUnlocked(table_id);
1533  if (!td) {
1534  return nullptr;
1535  }
1536  read_lock.unlock();
1537  if (populateFragmenter) {
1538  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1539  if (td->fragmenter == nullptr && !td->isView) {
1541  }
1542  }
1543  return td;
1544 }
1545 
1547  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1548  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1549  return nullptr;
1550  }
1551  return tableDescIt->second;
1552 }
1553 
1555  const bool load_dict) const {
1556  cat_read_lock read_lock(this);
1557  const DictRef dictRef(currentDB_.dbId, dict_id);
1558  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1559  if (dictDescIt ==
1560  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1561  return nullptr;
1562  }
1563  auto& dd = dictDescIt->second;
1564 
1565  if (load_dict) {
1566  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1567  if (!dd->stringDict) {
1568  auto time_ms = measure<>::execution([&]() {
1569  if (string_dict_hosts_.empty()) {
1570  if (dd->dictIsTemp) {
1571  dd->stringDict = std::make_shared<StringDictionary>(
1572  dd->dictFolderPath, true, true, g_cache_string_hash);
1573  } else {
1574  dd->stringDict = std::make_shared<StringDictionary>(
1575  dd->dictFolderPath, false, true, g_cache_string_hash);
1576  }
1577  } else {
1578  dd->stringDict =
1579  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1580  }
1581  });
1582  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1583  << dd->dictRef.dictId << " was " << time_ms << "ms";
1584  }
1585  }
1586 
1587  return dd.get();
1588 }
1589 
1590 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1591  return string_dict_hosts_;
1592 }
1593 
1595  const string& columnName) const {
1596  cat_read_lock read_lock(this);
1597 
1598  ColumnKey columnKey(tableId, to_upper(columnName));
1599  auto colDescIt = columnDescriptorMap_.find(columnKey);
1600  if (colDescIt ==
1601  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1602  return nullptr;
1603  }
1604  return colDescIt->second;
1605 }
1606 
1607 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1608  cat_read_lock read_lock(this);
1609  ColumnIdKey columnIdKey(table_id, column_id);
1610  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1611  if (colDescIt == columnDescriptorMapById_
1612  .end()) { // need to check to make sure column exists for table
1613  return nullptr;
1614  }
1615  return colDescIt->second;
1616 }
1617 
1618 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1619  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1620  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1621  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1622 
1623  auto spx = spi;
1624  int phi = 0;
1625  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1626  {
1627  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1628  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1629  }
1630 
1631  CHECK(0 < spx && spx <= columnIdBySpi.size());
1632  return columnIdBySpi[spx - 1] + phi;
1633 }
1634 
1635 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
1636  cat_read_lock read_lock(this);
1637  return getColumnIdBySpiUnlocked(table_id, spi);
1638 }
1639 
1641  const size_t spi) const {
1642  cat_read_lock read_lock(this);
1643 
1644  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
1645  ColumnIdKey columnIdKey(tableId, columnId);
1646  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1647  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
1648 }
1649 
1650 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
1651  const UserMetadata& user) {
1652  std::stringstream invalid_ids, restricted_ids;
1653 
1654  for (int32_t dashboard_id : dashboard_ids) {
1655  if (!getMetadataForDashboard(dashboard_id)) {
1656  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
1657  continue;
1658  }
1659  DBObject object(dashboard_id, DashboardDBObjectType);
1660  object.loadKey(*this);
1661  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
1662  std::vector<DBObject> privs = {object};
1663  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
1664  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
1665  }
1666  }
1667 
1668  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
1669  std::stringstream error_message;
1670  error_message << "Delete dashboard(s) failed with error(s):";
1671  if (invalid_ids.str().size() > 0) {
1672  error_message << "\nDashboard id: " << invalid_ids.str()
1673  << " - Dashboard id does not exist";
1674  }
1675  if (restricted_ids.str().size() > 0) {
1676  error_message
1677  << "\nDashboard id: " << restricted_ids.str()
1678  << " - User should be either owner of dashboard or super user to delete it";
1679  }
1680  throw std::runtime_error(error_message.str());
1681  }
1682  std::vector<DBObject> dash_objs;
1683 
1684  for (int32_t dashboard_id : dashboard_ids) {
1685  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
1686  }
1687  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
1689  {
1690  cat_write_lock write_lock(this);
1692 
1693  sqliteConnector_.query("BEGIN TRANSACTION");
1694  try {
1695  for (int32_t dashboard_id : dashboard_ids) {
1696  auto dash = getMetadataForDashboard(dashboard_id);
1697  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
1698  // rollback if already deleted
1699  if (!dash) {
1700  throw std::runtime_error(
1701  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
1702  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
1703  }
1704  std::string user_id = std::to_string(dash->userId);
1705  std::string dash_name = dash->dashboardName;
1706  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
1707  dashboardDescriptorMap_.erase(viewDescIt);
1709  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
1710  std::vector<std::string>{dash_name, user_id});
1711  }
1712  } catch (std::exception& e) {
1713  sqliteConnector_.query("ROLLBACK TRANSACTION");
1714  throw;
1715  }
1716  sqliteConnector_.query("END TRANSACTION");
1717  }
1718 }
1719 
1721  const string& userId,
1722  const string& dashName) const {
1723  cat_read_lock read_lock(this);
1724 
1725  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
1726  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
1727  return nullptr;
1728  }
1729  return viewDescIt->second.get(); // returns pointer to view descriptor
1730 }
1731 
1733  cat_read_lock read_lock(this);
1734  std::string userId;
1735  std::string name;
1736  bool found{false};
1737  {
1738  for (auto descp : dashboardDescriptorMap_) {
1739  auto dash = descp.second.get();
1740  if (dash->dashboardId == id) {
1741  userId = std::to_string(dash->userId);
1742  name = dash->dashboardName;
1743  found = true;
1744  break;
1745  }
1746  }
1747  }
1748  if (found) {
1749  return getMetadataForDashboard(userId, name);
1750  }
1751  return nullptr;
1752 }
1753 
1754 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
1755  cat_read_lock read_lock(this);
1756  auto linkDescIt = linkDescriptorMap_.find(link);
1757  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
1758  return nullptr;
1759  }
1760  return linkDescIt->second; // returns pointer to view descriptor
1761 }
1762 
1764  cat_read_lock read_lock(this);
1765  auto linkDescIt = linkDescriptorMapById_.find(linkId);
1766  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
1767  return nullptr;
1768  }
1769  return linkDescIt->second;
1770 }
1771 
1773  cat_read_lock read_lock(this);
1774  const auto table = getMutableMetadataForTableUnlocked(table_id);
1775  CHECK(table);
1776  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
1777  CHECK(foreign_table);
1778  return foreign_table;
1779 }
1780 
1782  const TableDescriptor* td,
1783  list<const ColumnDescriptor*>& columnDescriptors,
1784  const bool fetchSystemColumns,
1785  const bool fetchVirtualColumns,
1786  const bool fetchPhysicalColumns) const {
1787  int32_t skip_physical_cols = 0;
1788  for (const auto& columnDescriptor : columnDescriptorMapById_) {
1789  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
1790  --skip_physical_cols;
1791  continue;
1792  }
1793  auto cd = columnDescriptor.second;
1794  if (cd->tableId != td->tableId) {
1795  continue;
1796  }
1797  if (!fetchSystemColumns && cd->isSystemCol) {
1798  continue;
1799  }
1800  if (!fetchVirtualColumns && cd->isVirtualCol) {
1801  continue;
1802  }
1803  if (!fetchPhysicalColumns) {
1804  const auto& col_ti = cd->columnType;
1805  skip_physical_cols = col_ti.get_physical_cols();
1806  }
1807  columnDescriptors.push_back(cd);
1808  }
1809 }
1810 
1811 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
1812  const int tableId,
1813  const bool fetchSystemColumns,
1814  const bool fetchVirtualColumns,
1815  const bool fetchPhysicalColumns) const {
1816  cat_read_lock read_lock(this);
1817  std::list<const ColumnDescriptor*> columnDescriptors;
1820  columnDescriptors,
1821  fetchSystemColumns,
1822  fetchVirtualColumns,
1823  fetchPhysicalColumns);
1824  return columnDescriptors;
1825 }
1826 
1827 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
1828  cat_read_lock read_lock(this);
1829  list<const TableDescriptor*> table_list;
1830  for (auto p : tableDescriptorMapById_) {
1831  table_list.push_back(p.second);
1832  }
1833  return table_list;
1834 }
1835 
1836 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
1837  cat_read_lock read_lock(this);
1838  std::vector<TableDescriptor> tables;
1839  tables.reserve(tableDescriptorMapById_.size());
1840  for (auto table_entry : tableDescriptorMapById_) {
1841  tables.emplace_back(*table_entry.second);
1842  }
1843  return tables;
1844 }
1845 
1846 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
1847  cat_read_lock read_lock(this);
1848  list<const DashboardDescriptor*> dashboards;
1849  for (auto dashboard_entry : dashboardDescriptorMap_) {
1850  dashboards.push_back(dashboard_entry.second.get());
1851  }
1852  return dashboards;
1853 }
1854 
1855 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
1856  cat_read_lock read_lock(this);
1857  std::vector<DashboardDescriptor> dashboards;
1858  dashboards.reserve(dashboardDescriptorMap_.size());
1859  for (auto dashboard_entry : dashboardDescriptorMap_) {
1860  dashboards.emplace_back(*dashboard_entry.second);
1861  }
1862  return dashboards;
1863 }
1864 
1866  cat_write_lock write_lock(this);
1867  const auto& td = *tableDescriptorMapById_[cd.tableId];
1868  list<DictDescriptor> dds;
1869  setColumnDictionary(cd, dds, td, true);
1870  auto& dd = dds.back();
1871  CHECK(dd.dictRef.dictId);
1872 
1873  std::unique_ptr<StringDictionaryClient> client;
1874  if (!string_dict_hosts_.empty()) {
1875  client.reset(new StringDictionaryClient(
1876  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
1877  }
1878  if (client) {
1879  client->create(dd.dictRef, dd.dictIsTemp);
1880  }
1881 
1882  DictDescriptor* new_dd = new DictDescriptor(dd);
1883  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
1884  if (!dd.dictIsTemp) {
1885  boost::filesystem::create_directory(new_dd->dictFolderPath);
1886  }
1887  return dd.dictRef;
1888 }
1889 
1891  cat_write_lock write_lock(this);
1893  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
1894  return;
1895  }
1896  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
1897  return;
1898  }
1899  const auto dictId = cd.columnType.get_comp_param();
1900  CHECK_GT(dictId, 0);
1901  // decrement and zero check dict ref count
1902  const auto td = getMetadataForTable(cd.tableId, false);
1903  CHECK(td);
1905  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
1906  std::to_string(dictId));
1908  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
1909  const auto refcount = sqliteConnector_.getData<int>(0, 0);
1910  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
1911  << refcount;
1912  if (refcount > 0) {
1913  return;
1914  }
1915  const DictRef dictRef(currentDB_.dbId, dictId);
1916  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
1917  std::to_string(dictId));
1918  File_Namespace::renameForDelete(g_base_path + "/mapd_data/DB_" +
1919  std::to_string(currentDB_.dbId) + "_DICT_" +
1920  std::to_string(dictId));
1921 
1922  std::unique_ptr<StringDictionaryClient> client;
1923  if (!string_dict_hosts_.empty()) {
1924  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
1925  }
1926  if (client) {
1927  client->drop(dictRef);
1928  }
1929 
1930  dictDescriptorMapByRef_.erase(dictRef);
1931 }
1932 
1934  std::map<int, StringDictionary*>& stringDicts) {
1935  // learn 'committed' ColumnDescriptor of this column
1936  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
1937  CHECK(cit != columnDescriptorMap_.end());
1938  auto& ccd = *cit->second;
1939 
1940  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
1941  return;
1942  }
1943  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
1944  return;
1945  }
1946  if (!(ccd.columnType.get_comp_param() > 0)) {
1947  return;
1948  }
1949 
1950  auto dictId = ccd.columnType.get_comp_param();
1951  getMetadataForDict(dictId);
1952 
1953  const DictRef dictRef(currentDB_.dbId, dictId);
1954  auto dit = dictDescriptorMapByRef_.find(dictRef);
1955  CHECK(dit != dictDescriptorMapByRef_.end());
1956  CHECK(dit->second);
1957  CHECK(dit->second.get()->stringDict);
1958  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
1959 }
1960 
1962  // caller must handle sqlite/chunk transaction TOGETHER
1963  cd.tableId = td.tableId;
1964  if (td.nShards > 0 && td.shard < 0) {
1965  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
1966  auto shard_cd = cd;
1967  addColumn(*shard, shard_cd);
1968  }
1969  }
1971  addDictionary(cd);
1972  }
1973 
1974  using BindType = SqliteConnector::BindType;
1975  std::vector<BindType> types(17, BindType::TEXT);
1976  if (!cd.default_value.has_value()) {
1977  types[16] = BindType::NULL_TYPE;
1978  }
1980  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
1981  "colscale, is_notnull, "
1982  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
1983  "is_deletedcol, default_value) "
1984  "VALUES (?, "
1985  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
1986  "?, ?, ?, "
1987  "?, "
1988  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1989  std::vector<std::string>{std::to_string(td.tableId),
1990  std::to_string(td.tableId),
1991  cd.columnName,
2000  "",
2003  cd.virtualExpr,
2005  cd.default_value.value_or("NULL")},
2006  types);
2007 
2009  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2010  std::vector<std::string>{std::to_string(td.tableId)});
2011 
2013  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2014  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2015  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2016 
2017  ++tableDescriptorMapById_[td.tableId]->nColumns;
2018  auto ncd = new ColumnDescriptor(cd);
2021  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2022 }
2023 
2025  {
2026  cat_write_lock write_lock(this);
2028  // caller must handle sqlite/chunk transaction TOGETHER
2030  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2031  std::vector<std::string>{std::to_string(td.tableId),
2032  std::to_string(cd.columnId)});
2033 
2035  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2036  std::vector<std::string>{std::to_string(td.tableId)});
2037 
2038  ColumnDescriptorMap::iterator columnDescIt =
2040  CHECK(columnDescIt != columnDescriptorMap_.end());
2041 
2042  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2043 
2044  columnDescriptorMap_.erase(columnDescIt);
2046  --tableDescriptorMapById_[td.tableId]->nColumns;
2047  }
2048 
2049  // for each shard
2050  if (td.nShards > 0 && td.shard < 0) {
2051  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2052  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2053  CHECK(shard_cd);
2054  dropColumn(*shard, *shard_cd);
2055  }
2056  }
2057 }
2058 
2059 void Catalog::roll(const bool forward) {
2060  cat_write_lock write_lock(this);
2061  std::set<const TableDescriptor*> tds;
2062 
2063  for (const auto& cdr : columnDescriptorsForRoll) {
2064  auto ocd = cdr.first;
2065  auto ncd = cdr.second;
2066  CHECK(ocd || ncd);
2067  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2068  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2069  auto td = tabDescIt->second;
2070  auto& vc = td->columnIdBySpi_;
2071  if (forward) {
2072  if (ocd) {
2073  if (nullptr == ncd ||
2074  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2075  delDictionary(*ocd);
2076  }
2077 
2078  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2079 
2080  delete ocd;
2081  }
2082  if (ncd) {
2083  // append columnId if its new and not phy geo
2084  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2085  if (!ncd->isGeoPhyCol) {
2086  vc.push_back(ncd->columnId);
2087  }
2088  }
2089  }
2090  tds.insert(td);
2091  } else {
2092  if (ocd) {
2093  columnDescriptorMap_[ColumnKey(ocd->tableId, to_upper(ocd->columnName))] = ocd;
2094  columnDescriptorMapById_[ColumnIdKey(ocd->tableId, ocd->columnId)] = ocd;
2095  }
2096  // roll back the dict of new column
2097  if (ncd) {
2098  columnDescriptorMap_.erase(ColumnKey(ncd->tableId, to_upper(ncd->columnName)));
2099  columnDescriptorMapById_.erase(ColumnIdKey(ncd->tableId, ncd->columnId));
2100  if (nullptr == ocd ||
2101  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2102  delDictionary(*ncd);
2103  }
2104  delete ncd;
2105  }
2106  }
2107  }
2108  columnDescriptorsForRoll.clear();
2109 
2110  if (forward) {
2111  for (const auto td : tds) {
2112  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2113  }
2114  }
2115 }
2116 
2118  list<ColumnDescriptor>& columns) {
2119  const auto& col_ti = cd.columnType;
2120  if (IS_GEO(col_ti.get_type())) {
2121  switch (col_ti.get_type()) {
2122  case kPOINT: {
2123  ColumnDescriptor physical_cd_coords(true);
2124  physical_cd_coords.columnName = cd.columnName + "_coords";
2125  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2126  // Raw data: compressed/uncompressed coords
2127  coords_ti.set_subtype(kTINYINT);
2128  size_t unit_size;
2129  if (col_ti.get_compression() == kENCODING_GEOINT &&
2130  col_ti.get_comp_param() == 32) {
2131  unit_size = 4 * sizeof(int8_t);
2132  } else {
2133  CHECK(col_ti.get_compression() == kENCODING_NONE);
2134  unit_size = 8 * sizeof(int8_t);
2135  }
2136  coords_ti.set_size(2 * unit_size);
2137  physical_cd_coords.columnType = coords_ti;
2138  columns.push_back(physical_cd_coords);
2139 
2140  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2141 
2142  break;
2143  }
2144  case kLINESTRING: {
2145  ColumnDescriptor physical_cd_coords(true);
2146  physical_cd_coords.columnName = cd.columnName + "_coords";
2147  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2148  // Raw data: compressed/uncompressed coords
2149  coords_ti.set_subtype(kTINYINT);
2150  physical_cd_coords.columnType = coords_ti;
2151  columns.push_back(physical_cd_coords);
2152 
2153  ColumnDescriptor physical_cd_bounds(true);
2154  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2155  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2156  bounds_ti.set_subtype(kDOUBLE);
2157  bounds_ti.set_size(4 * sizeof(double));
2158  physical_cd_bounds.columnType = bounds_ti;
2159  columns.push_back(physical_cd_bounds);
2160 
2161  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2162 
2163  break;
2164  }
2165  case kPOLYGON: {
2166  ColumnDescriptor physical_cd_coords(true);
2167  physical_cd_coords.columnName = cd.columnName + "_coords";
2168  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2169  // Raw data: compressed/uncompressed coords
2170  coords_ti.set_subtype(kTINYINT);
2171  physical_cd_coords.columnType = coords_ti;
2172  columns.push_back(physical_cd_coords);
2173 
2174  ColumnDescriptor physical_cd_ring_sizes(true);
2175  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2176  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2177  ring_sizes_ti.set_subtype(kINT);
2178  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2179  columns.push_back(physical_cd_ring_sizes);
2180 
2181  ColumnDescriptor physical_cd_bounds(true);
2182  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2183  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2184  bounds_ti.set_subtype(kDOUBLE);
2185  bounds_ti.set_size(4 * sizeof(double));
2186  physical_cd_bounds.columnType = bounds_ti;
2187  columns.push_back(physical_cd_bounds);
2188 
2189  ColumnDescriptor physical_cd_render_group(true);
2190  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2191  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2192  physical_cd_render_group.columnType = render_group_ti;
2193  columns.push_back(physical_cd_render_group);
2194 
2195  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2196 
2197  break;
2198  }
2199  case kMULTIPOLYGON: {
2200  ColumnDescriptor physical_cd_coords(true);
2201  physical_cd_coords.columnName = cd.columnName + "_coords";
2202  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2203  // Raw data: compressed/uncompressed coords
2204  coords_ti.set_subtype(kTINYINT);
2205  physical_cd_coords.columnType = coords_ti;
2206  columns.push_back(physical_cd_coords);
2207 
2208  ColumnDescriptor physical_cd_ring_sizes(true);
2209  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2210  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2211  ring_sizes_ti.set_subtype(kINT);
2212  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2213  columns.push_back(physical_cd_ring_sizes);
2214 
2215  ColumnDescriptor physical_cd_poly_rings(true);
2216  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2217  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2218  poly_rings_ti.set_subtype(kINT);
2219  physical_cd_poly_rings.columnType = poly_rings_ti;
2220  columns.push_back(physical_cd_poly_rings);
2221 
2222  ColumnDescriptor physical_cd_bounds(true);
2223  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2224  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2225  bounds_ti.set_subtype(kDOUBLE);
2226  bounds_ti.set_size(4 * sizeof(double));
2227  physical_cd_bounds.columnType = bounds_ti;
2228  columns.push_back(physical_cd_bounds);
2229 
2230  ColumnDescriptor physical_cd_render_group(true);
2231  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2232  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2233  physical_cd_render_group.columnType = render_group_ti;
2234  columns.push_back(physical_cd_render_group);
2235 
2236  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2237 
2238  break;
2239  }
2240  default:
2241  throw runtime_error("Unrecognized geometry type.");
2242  break;
2243  }
2244  }
2245 }
2246 
2247 namespace {
2249  auto timing_type_entry =
2251  CHECK(timing_type_entry != foreign_table.options.end());
2252  if (timing_type_entry->second ==
2255  foreign_table.options);
2256  }
2258 }
2259 } // namespace
2260 
2262  TableDescriptor& td,
2263  const list<ColumnDescriptor>& cols,
2264  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2265  bool isLogicalTable) {
2266  cat_write_lock write_lock(this);
2267  list<ColumnDescriptor> cds = cols;
2268  list<DictDescriptor> dds;
2269  std::set<std::string> toplevel_column_names;
2270  list<ColumnDescriptor> columns;
2271 
2272  if (!td.storageType.empty() &&
2275  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2276  }
2277  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2278  }
2279 
2280  for (auto cd : cds) {
2281  if (cd.columnName == "rowid") {
2282  throw std::runtime_error(
2283  "Cannot create column with name rowid. rowid is a system defined column.");
2284  }
2285  columns.push_back(cd);
2286  toplevel_column_names.insert(cd.columnName);
2287  if (cd.columnType.is_geometry()) {
2288  expandGeoColumn(cd, columns);
2289  }
2290  }
2291  cds.clear();
2292 
2293  ColumnDescriptor cd;
2294  // add row_id column -- Must be last column in the table
2295  cd.columnName = "rowid";
2296  cd.isSystemCol = true;
2297  cd.columnType = SQLTypeInfo(kBIGINT, true);
2298 #ifdef MATERIALIZED_ROWID
2299  cd.isVirtualCol = false;
2300 #else
2301  cd.isVirtualCol = true;
2302  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2303 #endif
2304  columns.push_back(cd);
2305  toplevel_column_names.insert(cd.columnName);
2306 
2307  if (td.hasDeletedCol) {
2308  ColumnDescriptor cd_del;
2309  cd_del.columnName = "$deleted$";
2310  cd_del.isSystemCol = true;
2311  cd_del.isVirtualCol = false;
2312  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2313  cd_del.isDeletedCol = true;
2314 
2315  columns.push_back(cd_del);
2316  }
2317 
2318  td.nColumns = columns.size();
2320  sqliteConnector_.query("BEGIN TRANSACTION");
2322  try {
2324  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2325  std::vector<std::string>{td.tableName,
2326  std::to_string(td.userId),
2328  std::to_string(td.isView),
2329  "",
2334  std::to_string(td.maxRows),
2335  td.partitions,
2337  std::to_string(td.shard),
2338  std::to_string(td.nShards),
2340  td.storageType,
2343  td.keyMetainfo});
2344 
2345  // now get the auto generated tableid
2347  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2348  td.tableId = sqliteConnector_.getData<int>(0, 0);
2349  int colId = 1;
2350  for (auto cd : columns) {
2352  const bool is_foreign_col =
2353  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2354  if (!is_foreign_col) {
2355  setColumnDictionary(cd, dds, td, isLogicalTable);
2356  }
2357  }
2358 
2359  if (toplevel_column_names.count(cd.columnName)) {
2360  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2361  if (colId > 1) {
2362  colId += g_test_against_columnId_gap;
2363  }
2364  if (!cd.isGeoPhyCol) {
2365  td.columnIdBySpi_.push_back(colId);
2366  }
2367  }
2368 
2369  using BindType = SqliteConnector::BindType;
2370  std::vector<BindType> types(17, BindType::TEXT);
2371  if (!cd.default_value.has_value()) {
2372  types[16] = BindType::NULL_TYPE;
2373  }
2375  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2376  "coldim, colscale, is_notnull, "
2377  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2378  "virtual_expr, is_deletedcol, default_value) "
2379  "VALUES (?, ?, ?, ?, ?, "
2380  "?, "
2381  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2382  std::vector<std::string>{std::to_string(td.tableId),
2383  std::to_string(colId),
2384  cd.columnName,
2393  "",
2396  cd.virtualExpr,
2398  cd.default_value.value_or("NULL")},
2399  types);
2400  cd.tableId = td.tableId;
2401  cd.columnId = colId++;
2402  cds.push_back(cd);
2403  }
2404  if (td.isView) {
2406  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2407  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2408  }
2410  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2411  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2413  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2414  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2415  std::vector<std::string>{std::to_string(foreign_table.tableId),
2416  std::to_string(foreign_table.foreign_server->id),
2417  foreign_table.getOptionsAsJsonString(),
2418  std::to_string(foreign_table.last_refresh_time),
2419  std::to_string(foreign_table.next_refresh_time)});
2420  }
2421  } catch (std::exception& e) {
2422  sqliteConnector_.query("ROLLBACK TRANSACTION");
2423  throw;
2424  }
2425  } else { // Temporary table
2426  td.tableId = nextTempTableId_++;
2427  int colId = 1;
2428  for (auto cd : columns) {
2430  const bool is_foreign_col =
2431  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2432 
2433  if (!is_foreign_col) {
2434  // Create a new temporary dictionary
2435  std::string fileName("");
2436  std::string folderPath("");
2438  nextTempDictId_++;
2439  DictDescriptor dd(dict_ref,
2440  fileName,
2442  false,
2443  1,
2444  folderPath,
2445  true); // Is dictName (2nd argument) used?
2446  dds.push_back(dd);
2447  if (!cd.columnType.is_array()) {
2449  }
2450  cd.columnType.set_comp_param(dict_ref.dictId);
2451  }
2452  }
2453  if (toplevel_column_names.count(cd.columnName)) {
2454  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2455  if (colId > 1) {
2456  colId += g_test_against_columnId_gap;
2457  }
2458  if (!cd.isGeoPhyCol) {
2459  td.columnIdBySpi_.push_back(colId);
2460  }
2461  }
2462  cd.tableId = td.tableId;
2463  cd.columnId = colId++;
2464  cds.push_back(cd);
2465  }
2466 
2468  serializeTableJsonUnlocked(&td, cds);
2469  }
2470  }
2471 
2472  try {
2473  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2474  if (cache) {
2475  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2476  << "Disk cache at " + cache->getCacheDirectory()
2477  << " contains preexisting data for new table. Please "
2478  "delete or clear cache before continuing";
2479  }
2480 
2481  addTableToMap(&td, cds, dds);
2482  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2483  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2484  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2485  }
2486  } catch (std::exception& e) {
2487  sqliteConnector_.query("ROLLBACK TRANSACTION");
2488  removeTableFromMap(td.tableName, td.tableId, true);
2489  throw;
2490  }
2491  sqliteConnector_.query("END TRANSACTION");
2492 
2494  write_lock.unlock();
2495  sqlite_lock.unlock();
2496  getMetadataForTable(td.tableName,
2497  true); // cause instantiateFragmenter() to be called
2498  }
2499 }
2500 
2501 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2502  const std::list<ColumnDescriptor>& cds) const {
2503  // relies on the catalog write lock
2504  using namespace rapidjson;
2505 
2506  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2507 
2508  const auto db_name = currentDB_.dbName;
2509  const auto file_path = table_json_filepath(basePath_, db_name);
2510 
2511  Document d;
2512  if (boost::filesystem::exists(file_path)) {
2513  // look for an existing file for this database
2514  std::ifstream reader(file_path.string());
2515  CHECK(reader.is_open());
2516  IStreamWrapper json_read_wrapper(reader);
2517  d.ParseStream(json_read_wrapper);
2518  } else {
2519  d.SetObject();
2520  }
2521  CHECK(d.IsObject());
2522  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2523 
2524  Value table(kObjectType);
2525  table.AddMember(
2526  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2527  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2528  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2529 
2530  for (const auto& cd : cds) {
2531  Value column(kObjectType);
2532  column.AddMember(
2533  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2534  column.AddMember("coltype",
2535  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2536  d.GetAllocator());
2537  column.AddMember("colsubtype",
2538  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2539  d.GetAllocator());
2540  column.AddMember(
2541  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2542  column.AddMember(
2543  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2544  column.AddMember(
2545  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2546  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2547  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2548  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2549  table["columns"].PushBack(column, d.GetAllocator());
2550  }
2551  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2552 
2553  // Overwrite the existing file
2554  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2555  CHECK(writer.is_open());
2556  OStreamWrapper json_wrapper(writer);
2557 
2558  Writer<OStreamWrapper> json_writer(json_wrapper);
2559  d.Accept(json_writer);
2560  writer.close();
2561 }
2562 
2563 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2564  // relies on the catalog write lock
2565  using namespace rapidjson;
2566 
2567  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2568 
2569  const auto db_name = currentDB_.dbName;
2570  const auto file_path = table_json_filepath(basePath_, db_name);
2571 
2572  CHECK(boost::filesystem::exists(file_path));
2573  Document d;
2574 
2575  std::ifstream reader(file_path.string());
2576  CHECK(reader.is_open());
2577  IStreamWrapper json_read_wrapper(reader);
2578  d.ParseStream(json_read_wrapper);
2579 
2580  CHECK(d.IsObject());
2581  auto table_name_ref = StringRef(table_name.c_str());
2582  CHECK(d.HasMember(table_name_ref));
2583  CHECK(d.RemoveMember(table_name_ref));
2584 
2585  // Overwrite the existing file
2586  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2587  CHECK(writer.is_open());
2588  OStreamWrapper json_wrapper(writer);
2589 
2590  Writer<OStreamWrapper> json_writer(json_wrapper);
2591  d.Accept(json_writer);
2592  writer.close();
2593 }
2594 
2595 void Catalog::createForeignServer(
2596  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2597  bool if_not_exists) {
2598  cat_write_lock write_lock(this);
2599  cat_sqlite_lock sqlite_lock(getObjForLock());
2600  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
2601 }
2602 
2603 void Catalog::createForeignServerNoLocks(
2604  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2605  bool if_not_exists) {
2606  sqliteConnector_.query_with_text_params(
2607  "SELECT name from omnisci_foreign_servers where name = ?",
2608  std::vector<std::string>{foreign_server->name});
2609 
2610  if (sqliteConnector_.getNumRows() == 0) {
2611  foreign_server->creation_time = std::time(nullptr);
2612  sqliteConnector_.query_with_text_params(
2613  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
2614  "creation_time, "
2615  "options) "
2616  "VALUES (?, ?, ?, ?, ?)",
2617  std::vector<std::string>{foreign_server->name,
2618  foreign_server->data_wrapper_type,
2619  std::to_string(foreign_server->user_id),
2620  std::to_string(foreign_server->creation_time),
2621  foreign_server->getOptionsAsJsonString()});
2622  sqliteConnector_.query_with_text_params(
2623  "SELECT id from omnisci_foreign_servers where name = ?",
2624  std::vector<std::string>{foreign_server->name});
2625  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
2626  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
2627  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
2628  std::move(foreign_server);
2629  CHECK(foreignServerMap_.find(foreign_server_shared->name) == foreignServerMap_.end())
2630  << "Attempting to insert a foreign server into foreign server map that already "
2631  "exists.";
2632  foreignServerMap_[foreign_server_shared->name] = foreign_server_shared;
2633  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
2634  } else if (!if_not_exists) {
2635  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
2636  "\" already exists."};
2637  }
2638 }
2639 
2640 const foreign_storage::ForeignServer* Catalog::getForeignServer(
2641  const std::string& server_name) const {
2642  foreign_storage::ForeignServer* foreign_server = nullptr;
2643  cat_read_lock read_lock(this);
2644 
2645  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
2646  foreign_server = foreignServerMap_.find(server_name)->second.get();
2647  }
2648  return foreign_server;
2649 }
2650 
2651 const std::unique_ptr<const foreign_storage::ForeignServer>
2652 Catalog::getForeignServerFromStorage(const std::string& server_name) {
2653  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
2654  cat_sqlite_lock sqlite_lock(getObjForLock());
2655  sqliteConnector_.query_with_text_params(
2656  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
2657  "FROM omnisci_foreign_servers WHERE name = ?",
2658  std::vector<std::string>{server_name});
2659  if (sqliteConnector_.getNumRows() > 0) {
2660  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
2661  sqliteConnector_.getData<int>(0, 0),
2662  sqliteConnector_.getData<std::string>(0, 1),
2663  sqliteConnector_.getData<std::string>(0, 2),
2664  sqliteConnector_.getData<std::string>(0, 3),
2665  sqliteConnector_.getData<std::int32_t>(0, 4),
2666  sqliteConnector_.getData<std::int32_t>(0, 5));
2667  }
2668  return foreign_server;
2669 }
2670 
2671 const std::unique_ptr<const foreign_storage::ForeignTable>
2672 Catalog::getForeignTableFromStorage(int table_id) {
2673  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
2674  cat_sqlite_lock sqlite_lock(getObjForLock());
2675  sqliteConnector_.query_with_text_params(
2676  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
2677  "omnisci_foreign_tables WHERE table_id = ?",
2678  std::vector<std::string>{to_string(table_id)});
2679  auto num_rows = sqliteConnector_.getNumRows();
2680  if (num_rows > 0) {
2681  CHECK_EQ(size_t(1), num_rows);
2682  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
2683  sqliteConnector_.getData<int>(0, 0),
2684  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
2685  sqliteConnector_.getData<std::string>(0, 2),
2686  sqliteConnector_.getData<int>(0, 3),
2687  sqliteConnector_.getData<int>(0, 4));
2688  }
2689  return foreign_table;
2690 }
2691 
2692 void Catalog::changeForeignServerOwner(const std::string& server_name,
2693  const int new_owner_id) {
2694  cat_write_lock write_lock(this);
2695  foreign_storage::ForeignServer* foreign_server =
2696  foreignServerMap_.find(server_name)->second.get();
2697  CHECK(foreign_server);
2698  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
2699  // update in-memory server
2700  foreign_server->user_id = new_owner_id;
2701 }
2702 
2703 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
2704  const std::string& data_wrapper) {
2705  cat_write_lock write_lock(this);
2706  auto data_wrapper_type = to_upper(data_wrapper);
2707  // update in-memory server
2708  foreign_storage::ForeignServer* foreign_server =
2709  foreignServerMap_.find(server_name)->second.get();
2710  CHECK(foreign_server);
2711  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
2712  foreign_server->data_wrapper_type = data_wrapper_type;
2713  try {
2714  foreign_server->validate();
2715  } catch (const std::exception& e) {
2716  // validation did not succeed:
2717  // revert to saved data_wrapper_type & throw exception
2718  foreign_server->data_wrapper_type = saved_data_wrapper_type;
2719  throw;
2720  }
2721  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
2722 }
2723 
2724 void Catalog::setForeignServerOptions(const std::string& server_name,
2725  const std::string& options) {
2726  cat_write_lock write_lock(this);
2727  // update in-memory server
2728  foreign_storage::ForeignServer* foreign_server =
2729  foreignServerMap_.find(server_name)->second.get();
2730  CHECK(foreign_server);
2731  auto saved_options = foreign_server->options;
2732  foreign_server->populateOptionsMap(options, true);
2733  try {
2734  foreign_server->validate();
2735  } catch (const std::exception& e) {
2736  // validation did not succeed:
2737  // revert to saved options & throw exception
2738  foreign_server->options = saved_options;
2739  throw;
2740  }
2741  setForeignServerProperty(server_name, "options", options);
2742 }
2743 
2744 void Catalog::renameForeignServer(const std::string& server_name,
2745  const std::string& name) {
2746  cat_write_lock write_lock(this);
2747  auto foreign_server_it = foreignServerMap_.find(server_name);
2748  CHECK(foreign_server_it != foreignServerMap_.end());
2749  setForeignServerProperty(server_name, "name", name);
2750  auto foreign_server_shared = foreign_server_it->second;
2751  foreign_server_shared->name = name;
2752  foreignServerMap_[name] = foreign_server_shared;
2753  foreignServerMap_.erase(foreign_server_it);
2754 }
2755 
2756 void Catalog::dropForeignServer(const std::string& server_name) {
2757  cat_write_lock write_lock(this);
2758  cat_sqlite_lock sqlite_lock(getObjForLock());
2759 
2760  sqliteConnector_.query_with_text_params(
2761  "SELECT id from omnisci_foreign_servers where name = ?",
2762  std::vector<std::string>{server_name});
2763  auto num_rows = sqliteConnector_.getNumRows();
2764  if (num_rows > 0) {
2765  CHECK_EQ(size_t(1), num_rows);
2766  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
2767  sqliteConnector_.query_with_text_param(
2768  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
2769  std::to_string(server_id));
2770  if (sqliteConnector_.getNumRows() > 0) {
2771  throw std::runtime_error{"Foreign server \"" + server_name +
2772  "\" is referenced "
2773  "by existing foreign tables and cannot be dropped."};
2774  }
2775  sqliteConnector_.query("BEGIN TRANSACTION");
2776  try {
2777  sqliteConnector_.query_with_text_params(
2778  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
2779  std::vector<std::string>{server_name});
2780  } catch (const std::exception& e) {
2781  sqliteConnector_.query("ROLLBACK TRANSACTION");
2782  throw;
2783  }
2784  sqliteConnector_.query("END TRANSACTION");
2785  foreignServerMap_.erase(server_name);
2786  foreignServerMapById_.erase(server_id);
2787  }
2788 }
2789 
2790 void Catalog::getForeignServersForUser(
2791  const rapidjson::Value* filters,
2792  const UserMetadata& user,
2793  std::vector<const foreign_storage::ForeignServer*>& results) {
2794  sys_read_lock syscat_read_lock(&SysCatalog::instance());
2795  cat_read_lock read_lock(this);
2796  cat_sqlite_lock sqlite_lock(getObjForLock());
2797  // Customer facing and internal SQlite names
2798  std::map<std::string, std::string> col_names{{"server_name", "name"},
2799  {"data_wrapper", "data_wrapper_type"},
2800  {"created_at", "creation_time"},
2801  {"options", "options"}};
2802 
2803  // TODO add "owner" when FSI privilege is implemented
2804  std::stringstream filter_string;
2805  std::vector<std::string> arguments;
2806 
2807  if (filters != nullptr) {
2808  // Create SQL WHERE clause for SQLite query
2809  int num_filters = 0;
2810  filter_string << " WHERE";
2811  for (auto& filter_def : filters->GetArray()) {
2812  if (num_filters > 0) {
2813  filter_string << " " << std::string(filter_def["chain"].GetString());
2814  ;
2815  }
2816 
2817  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
2818  col_names.end()) {
2819  throw std::runtime_error{"Attribute with name \"" +
2820  std::string(filter_def["attribute"].GetString()) +
2821  "\" does not exist."};
2822  }
2823 
2824  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
2825 
2826  bool equals_operator = false;
2827  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
2828  filter_string << " = ? ";
2829  equals_operator = true;
2830  } else {
2831  filter_string << " LIKE ? ";
2832  }
2833 
2834  bool timestamp_column =
2835  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
2836 
2837  if (timestamp_column && !equals_operator) {
2838  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
2839  }
2840 
2841  if (timestamp_column && equals_operator) {
2842  arguments.push_back(std::to_string(
2843  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
2844  } else {
2845  arguments.emplace_back(filter_def["value"].GetString());
2846  }
2847 
2848  num_filters++;
2849  }
2850  }
2851  // Create select query for the omnisci_foreign_servers table
2852  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
2853  query += filter_string.str();
2854 
2855  sqliteConnector_.query_with_text_params(query, arguments);
2856  auto num_rows = sqliteConnector_.getNumRows();
2857 
2858  if (sqliteConnector_.getNumRows() == 0) {
2859  return;
2860  }
2861 
2862  CHECK(sqliteConnector_.getNumCols() == 1);
2863  // Return pointers to objects
2864  results.reserve(num_rows);
2865  for (size_t row = 0; row < num_rows; ++row) {
2866  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
2867  if (shared::contains(INTERNAL_SERVERS, server_name)) {
2868  continue;
2869  }
2870  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
2871  CHECK(foreign_server != nullptr);
2872 
2873  DBObject dbObject(foreign_server->name, ServerDBObjectType);
2874  dbObject.loadKey(*this);
2875  std::vector<DBObject> privObjects = {dbObject};
2876  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
2877  // skip server, as there are no privileges to access it
2878  continue;
2879  }
2880  results.push_back(foreign_server);
2881  }
2882 }
2883 
2884 // returns the table epoch or -1 if there is something wrong with the shared epoch
2885 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
2886  cat_read_lock read_lock(this);
2887  const auto td = getMetadataForTable(table_id, false);
2888  if (!td) {
2889  std::stringstream table_not_found_error_message;
2890  table_not_found_error_message << "Table (" << db_id << "," << table_id
2891  << ") not found";
2892  throw std::runtime_error(table_not_found_error_message.str());
2893  }
2894  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2895  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2896  // check all shards have same checkpoint
2897  const auto physicalTables = physicalTableIt->second;
2898  CHECK(!physicalTables.empty());
2899  size_t curr_epoch{0}, first_epoch{0};
2900  int32_t first_table_id{0};
2901  bool are_epochs_inconsistent{false};
2902  for (size_t i = 0; i < physicalTables.size(); i++) {
2903  int32_t physical_tb_id = physicalTables[i];
2904  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2905  CHECK(phys_td);
2906 
2907  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
2908  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
2909  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
2910  if (i == 0) {
2911  first_epoch = curr_epoch;
2912  first_table_id = physical_tb_id;
2913  } else if (first_epoch != curr_epoch) {
2914  are_epochs_inconsistent = true;
2915  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
2916  << ", db id: " << db_id
2917  << ". First table (table id: " << first_table_id
2918  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
2919  << ", has inconsistent epoch: " << curr_epoch
2920  << ". See previous INFO logs for all epochs and their table ids.";
2921  }
2922  }
2923  if (are_epochs_inconsistent) {
2924  // oh dear the shards do not agree on the epoch for this table
2925  return -1;
2926  }
2927  return curr_epoch;
2928  } else {
2929  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
2930  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
2931  << ", epoch: " << epoch;
2932  return epoch;
2933  }
2934 }
2935 
2936 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
2937  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
2938  << " back to new epoch " << new_epoch;
2939  const auto td = getMetadataForTable(table_id, false);
2940  if (!td) {
2941  std::stringstream table_not_found_error_message;
2942  table_not_found_error_message << "Table (" << db_id << "," << table_id
2943  << ") not found";
2944  throw std::runtime_error(table_not_found_error_message.str());
2945  }
2947  std::stringstream is_temp_table_error_message;
2948  is_temp_table_error_message << "Cannot set epoch on temporary table";
2949  throw std::runtime_error(is_temp_table_error_message.str());
2950  }
2951 
2952  File_Namespace::FileMgrParams file_mgr_params;
2953  file_mgr_params.epoch = new_epoch;
2954  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
2955 
2956  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
2957  CHECK(!physical_tables.empty());
2958  for (const auto table : physical_tables) {
2959  auto table_id = table->tableId;
2960  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
2961  << " back to new epoch " << new_epoch;
2962  // Should have table lock from caller so safe to do this after, avoids
2963  // having to repopulate data on error
2964  removeChunks(table_id);
2965  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
2966  }
2967 }
2968 
2969 void Catalog::alterPhysicalTableMetadata(
2970  const TableDescriptor* td,
2971  const TableDescriptorUpdateParams& table_update_params) {
2972  // Only called from parent alterTableParamMetadata, expect already to have catalog and
2973  // sqlite write locks
2974 
2975  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
2976 
2977  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
2978  CHECK(mutable_td);
2979  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
2980  sqliteConnector_.query_with_text_params(
2981  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
2982  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
2983  std::to_string(td->tableId)});
2984  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
2985  }
2986 
2987  if (td->maxRows != table_update_params.max_rows) {
2988  sqliteConnector_.query_with_text_params(
2989  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
2990  std::vector<std::string>{std::to_string(table_update_params.max_rows),
2991  std::to_string(td->tableId)});
2992  mutable_td->maxRows = table_update_params.max_rows;
2993  }
2994 }
2995 
2996 void Catalog::alterTableMetadata(const TableDescriptor* td,
2997  const TableDescriptorUpdateParams& table_update_params) {
2998  cat_write_lock write_lock(this);
2999  cat_sqlite_lock sqlite_lock(getObjForLock());
3000  sqliteConnector_.query("BEGIN TRANSACTION");
3001  try {
3002  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3003  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3004  const auto physical_tables = physical_table_it->second;
3005  CHECK(!physical_tables.empty());
3006  for (size_t i = 0; i < physical_tables.size(); i++) {
3007  int32_t physical_tb_id = physical_tables[i];
3008  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3009  CHECK(phys_td);
3010  alterPhysicalTableMetadata(phys_td, table_update_params);
3011  }
3012  }
3013  alterPhysicalTableMetadata(td, table_update_params);
3014  } catch (std::exception& e) {
3015  sqliteConnector_.query("ROLLBACK TRANSACTION");
3016  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3017  }
3018  sqliteConnector_.query("END TRANSACTION");
3019 }
3020 
3021 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3022  const int32_t max_rollback_epochs) {
3023  // Must be called from AlterTableParamStmt or other method that takes executor and
3024  // TableSchema locks
3025  if (max_rollback_epochs <= -1) {
3026  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3027  }
3028  const auto td = getMetadataForTable(
3029  table_id, false); // Deep copy as there will be gap between read and write locks
3030  CHECK(td); // Existence should have already been checked in
3031  // ParserNode::AlterTableParmStmt
3032  TableDescriptorUpdateParams table_update_params(td);
3033  table_update_params.max_rollback_epochs = max_rollback_epochs;
3034  if (table_update_params == td) { // Operator is overloaded to test for equality
3035  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3036  << " to existing value, skipping operation";
3037  return;
3038  }
3039  File_Namespace::FileMgrParams file_mgr_params;
3040  file_mgr_params.epoch = -1; // Use existing epoch
3041  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3042  setTableFileMgrParams(table_id, file_mgr_params);
3043  alterTableMetadata(td, table_update_params);
3044 }
3045 
3046 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3047  if (max_rows < 0) {
3048  throw std::runtime_error("Max rows cannot be a negative number.");
3049  }
3050  const auto td = getMetadataForTable(table_id);
3051  CHECK(td);
3052  TableDescriptorUpdateParams table_update_params(td);
3053  table_update_params.max_rows = max_rows;
3054  if (table_update_params == td) {
3055  LOG(INFO) << "Max rows value of " << max_rows
3056  << " is the same as the existing value. Skipping update.";
3057  return;
3058  }
3059  alterTableMetadata(td, table_update_params);
3060  CHECK(td->fragmenter);
3061  td->fragmenter->dropFragmentsToSize(max_rows);
3062 }
3063 
3064 // For testing purposes only
3065 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3066  cat_write_lock write_lock(this);
3067  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3068  CHECK(td_entry != tableDescriptorMap_.end());
3069  auto td = td_entry->second;
3070  TableDescriptorUpdateParams table_update_params(td);
3071  table_update_params.max_rollback_epochs = -1;
3072  write_lock.unlock();
3073 
3074  alterTableMetadata(td, table_update_params);
3075  File_Namespace::FileMgrParams file_mgr_params;
3076  file_mgr_params.max_rollback_epochs = -1;
3077  setTableFileMgrParams(td->tableId, file_mgr_params);
3078 }
3079 
3080 void Catalog::setTableFileMgrParams(
3081  const int table_id,
3082  const File_Namespace::FileMgrParams& file_mgr_params) {
3083  // Expects parent to have write lock
3084  const auto td = getMetadataForTable(table_id, false);
3085  const auto db_id = this->getDatabaseId();
3086  if (!td) {
3087  std::stringstream table_not_found_error_message;
3088  table_not_found_error_message << "Table (" << db_id << "," << table_id
3089  << ") not found";
3090  throw std::runtime_error(table_not_found_error_message.str());
3091  }
3093  std::stringstream is_temp_table_error_message;
3094  is_temp_table_error_message << "Cannot set storage params on temporary table";
3095  throw std::runtime_error(is_temp_table_error_message.str());
3096  }
3097 
3098  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3099  CHECK(!physical_tables.empty());
3100  for (const auto table : physical_tables) {
3101  auto table_id = table->tableId;
3102  removeChunks(table_id);
3103  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3104  }
3105 }
3106 
3107 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3108  const int32_t table_id) const {
3109  cat_read_lock read_lock(this);
3110  std::vector<TableEpochInfo> table_epochs;
3111  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3112  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3113  const auto physical_tables = physical_table_it->second;
3114  CHECK(!physical_tables.empty());
3115 
3116  for (const auto physical_tb_id : physical_tables) {
3117  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3118  CHECK(phys_td);
3119 
3120  auto table_id = phys_td->tableId;
3121  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3122  table_epochs.emplace_back(table_id, epoch);
3123  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3124  << ", table id: " << table_id << ", epoch: " << epoch;
3125  }
3126  } else {
3127  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3128  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3129  << ", epoch: " << epoch;
3130  table_epochs.emplace_back(table_id, epoch);
3131  }
3132  return table_epochs;
3133 }
3134 
3135 void Catalog::setTableEpochs(const int32_t db_id,
3136  const std::vector<TableEpochInfo>& table_epochs) const {
3137  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3138  CHECK(td);
3139  File_Namespace::FileMgrParams file_mgr_params;
3140  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3141 
3142  for (const auto& table_epoch_info : table_epochs) {
3143  removeChunks(table_epoch_info.table_id);
3144  file_mgr_params.epoch = table_epoch_info.table_epoch;
3145  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3146  db_id, table_epoch_info.table_id, file_mgr_params);
3147  LOG(INFO) << "Set table epoch for db id: " << db_id
3148  << ", table id: " << table_epoch_info.table_id
3149  << ", back to epoch: " << table_epoch_info.table_epoch;
3150  }
3151 }
3152 
3153 namespace {
3154 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3155  std::string table_epochs_str{"["};
3156  bool first_entry{true};
3157  for (const auto& table_epoch : table_epochs) {
3158  if (first_entry) {
3159  first_entry = false;
3160  } else {
3161  table_epochs_str += ", ";
3162  }
3163  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3164  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3165  }
3166  table_epochs_str += "]";
3167  return table_epochs_str;
3168 }
3169 } // namespace
3170 
3171 void Catalog::setTableEpochsLogExceptions(
3172  const int32_t db_id,
3173  const std::vector<TableEpochInfo>& table_epochs) const {
3174  try {
3175  setTableEpochs(db_id, table_epochs);
3176  } catch (std::exception& e) {
3177  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3178  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3179  << ", Error: " << e.what();
3180  }
3181 }
3182 
3183 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3184  cat_read_lock read_lock(this);
3185  const auto it = deletedColumnPerTable_.find(td);
3186  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3187 }
3188 
3189 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3190  int delete_column_id) const {
3191  // check if there are rows deleted by examining the deletedColumn metadata
3192  CHECK(td);
3193  auto fragmenter = td->fragmenter;
3194  if (fragmenter) {
3195  return fragmenter->hasDeletedRows(delete_column_id);
3196  } else {
3197  return false;
3198  }
3199 }
3200 
3201 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3202  const TableDescriptor* td) const {
3203  std::vector<const TableDescriptor*> tds;
3204  const ColumnDescriptor* cd;
3205  {
3206  cat_read_lock read_lock(this);
3207 
3208  const auto it = deletedColumnPerTable_.find(td);
3209  // if not a table that supports delete return nullptr, nothing more to do
3210  if (it == deletedColumnPerTable_.end()) {
3211  return nullptr;
3212  }
3213  cd = it->second;
3214  tds = getPhysicalTablesDescriptors(td, false);
3215  }
3216  // individual tables are still protected by higher level locks
3217  for (auto tdd : tds) {
3218  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3219  return cd;
3220  }
3221  }
3222  // no deletes so far recorded in metadata
3223  return nullptr;
3224 }
3225 
3226 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3227  cat_write_lock write_lock(this);
3228  setDeletedColumnUnlocked(td, cd);
3229 }
3230 
3231 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3232  const ColumnDescriptor* cd) {
3233  cat_write_lock write_lock(this);
3234  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3235  CHECK(it_ok.second);
3236 }
3237 
3238 namespace {
3239 
3241  const Catalog& cat,
3242  const Parser::SharedDictionaryDef& shared_dict_def) {
3243  const auto& table_name = shared_dict_def.get_foreign_table();
3244  const auto td = cat.getMetadataForTable(table_name, false);
3245  CHECK(td);
3246  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3247  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3248 }
3249 
3250 } // namespace
3251 
3252 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3253  Parser::SharedDictionaryDef shared_dict_def,
3254  const bool persist_reference) {
3255  cat_write_lock write_lock(this);
3256  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3257  CHECK(foreign_ref_col);
3258  referencing_column.columnType = foreign_ref_col->columnType;
3259  const int dict_id = referencing_column.columnType.get_comp_param();
3260  const DictRef dict_ref(currentDB_.dbId, dict_id);
3261  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3262  CHECK(dictIt != dictDescriptorMapByRef_.end());
3263  const auto& dd = dictIt->second;
3264  CHECK_GE(dd->refcount, 1);
3265  ++dd->refcount;
3266  if (persist_reference) {
3267  cat_sqlite_lock sqlite_lock(getObjForLock());
3268  sqliteConnector_.query_with_text_params(
3269  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3270  {std::to_string(dict_id)});
3271  }
3272 }
3273 
3274 bool Catalog::setColumnSharedDictionary(
3275  ColumnDescriptor& cd,
3276  std::list<ColumnDescriptor>& cdd,
3277  std::list<DictDescriptor>& dds,
3278  const TableDescriptor td,
3279  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3280  cat_write_lock write_lock(this);
3281  cat_sqlite_lock sqlite_lock(getObjForLock());
3282 
3283  if (shared_dict_defs.empty()) {
3284  return false;
3285  }
3286  for (const auto& shared_dict_def : shared_dict_defs) {
3287  // check if the current column is a referencing column
3288  const auto& column = shared_dict_def.get_column();
3289  if (cd.columnName == column) {
3290  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3291  // Dictionaries are being shared in table to be created
3292  const auto& ref_column = shared_dict_def.get_foreign_column();
3293  auto colIt =
3294  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3295  return !ref_column.compare(it.columnName);
3296  });
3297  CHECK(colIt != cdd.end());
3298  cd.columnType = colIt->columnType;
3299 
3300  const int dict_id = colIt->columnType.get_comp_param();
3301  CHECK_GE(dict_id, 1);
3302  auto dictIt = std::find_if(
3303  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3304  return it.dictRef.dbId == this->currentDB_.dbId &&
3305  it.dictRef.dictId == dict_id;
3306  });
3307  if (dictIt != dds.end()) {
3308  // There exists dictionary definition of a dictionary column
3309  CHECK_GE(dictIt->refcount, 1);
3310  ++dictIt->refcount;
3311  if (!table_is_temporary(&td)) {
3312  // Persist reference count
3313  sqliteConnector_.query_with_text_params(
3314  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3315  {std::to_string(dict_id)});
3316  }
3317  } else {
3318  // The dictionary is referencing a column which is referencing a column in
3319  // diffrent table
3320  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3321  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3322  }
3323  } else {
3324  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3325  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3326  if (table_is_temporary(foreign_td)) {
3327  if (!table_is_temporary(&td)) {
3328  throw std::runtime_error(
3329  "Only temporary tables can share dictionaries with other temporary "
3330  "tables.");
3331  }
3332  addReferenceToForeignDict(cd, shared_dict_def, false);
3333  } else {
3334  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3335  }
3336  }
3337  return true;
3338  }
3339  }
3340  return false;
3341 }
3342 
3343 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3344  std::list<DictDescriptor>& dds,
3345  const TableDescriptor& td,
3346  const bool isLogicalTable) {
3347  cat_write_lock write_lock(this);
3348 
3349  std::string dictName{"Initial_key"};
3350  int dictId{0};
3351  std::string folderPath;
3352  if (isLogicalTable) {
3353  cat_sqlite_lock sqlite_lock(getObjForLock());
3354 
3355  sqliteConnector_.query_with_text_params(
3356  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3357  "?, 1)",
3358  std::vector<std::string>{
3359  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3360  sqliteConnector_.query_with_text_param(
3361  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3362  dictId = sqliteConnector_.getData<int>(0, 0);
3363  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3364  sqliteConnector_.query_with_text_param(
3365  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3366  folderPath = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
3367  "_DICT_" + std::to_string(dictId);
3368  }
3369  DictDescriptor dd(currentDB_.dbId,
3370  dictId,
3371  dictName,
3373  false,
3374  1,
3375  folderPath,
3376  false);
3377  dds.push_back(dd);
3378  if (!cd.columnType.is_array()) {
3380  }
3381  cd.columnType.set_comp_param(dictId);
3382 }
3383 
3384 void Catalog::createShardedTable(
3385  TableDescriptor& td,
3386  const list<ColumnDescriptor>& cols,
3387  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3388  /* create logical table */
3389  TableDescriptor* tdl = &td;
3390  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3391  int32_t logical_tb_id = tdl->tableId;
3392  std::string logical_table_name = tdl->tableName;
3393 
3394  /* create physical tables and link them to the logical table */
3395  std::vector<int32_t> physicalTables;
3396  for (int32_t i = 1; i <= td.nShards; i++) {
3397  TableDescriptor* tdp = &td;
3398  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3399  tdp->shard = i - 1;
3400  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3401  int32_t physical_tb_id = tdp->tableId;
3402 
3403  /* add physical table to the vector of physical tables */
3404  physicalTables.push_back(physical_tb_id);
3405  }
3406 
3407  if (!physicalTables.empty()) {
3408  cat_write_lock write_lock(this);
3409  /* add logical to physical tables correspondence to the map */
3410  const auto it_ok =
3411  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3412  CHECK(it_ok.second);
3413  /* update sqlite mapd_logical_to_physical in sqlite database */
3414  if (!table_is_temporary(&td)) {
3415  updateLogicalToPhysicalTableMap(logical_tb_id);
3416  }
3417  }
3418 }
3419 
3420 void Catalog::truncateTable(const TableDescriptor* td) {
3421  // truncate all corresponding physical tables
3422  const auto physical_tables = getPhysicalTablesDescriptors(td);
3423  for (const auto table : physical_tables) {
3424  doTruncateTable(table);
3425  }
3426 }
3427 
3428 void Catalog::doTruncateTable(const TableDescriptor* td) {
3429  // must destroy fragmenter before deleteChunks is called.
3430  removeFragmenterForTable(td->tableId);
3431 
3432  const int tableId = td->tableId;
3433  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3434  // assuming deleteChunksWithPrefix is atomic
3435  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3436  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3437 
3438  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3439 
3440  cat_write_lock write_lock(this);
3441  std::unique_ptr<StringDictionaryClient> client;
3442  if (SysCatalog::instance().isAggregator()) {
3443  CHECK(!string_dict_hosts_.empty());
3444  DictRef dict_ref(currentDB_.dbId, -1);
3445  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3446  }
3447  // clean up any dictionaries
3448  // delete all column descriptors for the table
3449  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3450  auto cd = columnDescriptor.second;
3451  if (cd->tableId != td->tableId) {
3452  continue;
3453  }
3454  const int dict_id = cd->columnType.get_comp_param();
3455  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3456  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3457  const DictRef dict_ref(currentDB_.dbId, dict_id);
3458  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3459  CHECK(dictIt != dictDescriptorMapByRef_.end());
3460  const auto& dd = dictIt->second;
3461  CHECK_GE(dd->refcount, 1);
3462  // if this is the only table using this dict reset the dict
3463  if (dd->refcount == 1) {
3464  // close the dictionary
3465  dd->stringDict.reset();
3466  File_Namespace::renameForDelete(dd->dictFolderPath);
3467  if (client) {
3468  client->drop(dd->dictRef);
3469  }
3470  if (!dd->dictIsTemp) {
3471  boost::filesystem::create_directory(dd->dictFolderPath);
3472  }
3473  }
3474 
3475  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3476  dd->dictName,
3477  dd->dictNBits,
3478  dd->dictIsShared,
3479  dd->refcount,
3480  dd->dictFolderPath,
3481  dd->dictIsTemp);
3482  dictDescriptorMapByRef_.erase(dictIt);
3483  // now create new Dict -- need to figure out what to do here for temp tables
3484  if (client) {
3485  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3486  }
3487  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3488  getMetadataForDict(new_dd->dictRef.dictId);
3489  }
3490  }
3491 }
3492 
3493 void Catalog::removeFragmenterForTable(const int table_id) const {
3494  cat_write_lock write_lock(this);
3495  auto td = getMetadataForTable(table_id, false);
3496  if (td->fragmenter != nullptr) {
3497  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3498  CHECK(tableDescIt != tableDescriptorMapById_.end());
3499  tableDescIt->second->fragmenter = nullptr;
3500  CHECK(td->fragmenter == nullptr);
3501  }
3502 }
3503 
3504 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3505 void Catalog::removeChunks(const int table_id) const {
3506  removeFragmenterForTable(table_id);
3507 
3508  // remove the chunks from in memory structures
3509  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3510 
3511  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3512  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3513 }
3514 
3515 void Catalog::dropTable(const TableDescriptor* td) {
3516  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
3518  std::vector<const TableDescriptor*> tables_to_drop;
3519  {
3520  cat_read_lock read_lock(this);
3521  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3522  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3523  // remove all corresponding physical tables if this is a logical table
3524  const auto physicalTables = physicalTableIt->second;
3525  CHECK(!physicalTables.empty());
3526  for (size_t i = 0; i < physicalTables.size(); i++) {
3527  int32_t physical_tb_id = physicalTables[i];
3528  const TableDescriptor* phys_td =
3529  getMutableMetadataForTableUnlocked(physical_tb_id);
3530  CHECK(phys_td);
3531  tables_to_drop.emplace_back(phys_td);
3532  }
3533  }
3534  tables_to_drop.emplace_back(td);
3535  }
3536 
3537  for (auto table : tables_to_drop) {
3538  eraseTablePhysicalData(table);
3539  }
3540 
3541  {
3542  cat_write_lock write_lock(this);
3543  cat_sqlite_lock sqlite_lock(getObjForLock());
3544  sqliteConnector_.query("BEGIN TRANSACTION");
3545  try {
3546  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
3547  sqliteConnector_.query_with_text_param(
3548  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
3549  std::to_string(td->tableId));
3550  logicalToPhysicalTableMapById_.erase(td->tableId);
3551  for (auto table : tables_to_drop) {
3552  eraseTableMetadata(table);
3553  }
3554  } catch (std::exception& e) {
3555  sqliteConnector_.query("ROLLBACK TRANSACTION");
3556  throw;
3557  }
3558  sqliteConnector_.query("END TRANSACTION");
3559  }
3560 }
3561 
3562 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
3563  executeDropTableSqliteQueries(td);
3565  dropTableFromJsonUnlocked(td->tableName);
3566  }
3567  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3568  {
3569  INJECT_TIMER(removeTableFromMap_);
3570  removeTableFromMap(td->tableName, td->tableId);
3571  }
3572 }
3573 
3574 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
3575  const int tableId = td->tableId;
3576  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
3577  std::to_string(tableId));
3578  sqliteConnector_.query_with_text_params(
3579  "select comp_param from mapd_columns where compression = ? and tableid = ?",
3580  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3581  int numRows = sqliteConnector_.getNumRows();
3582  std::vector<int> dict_id_list;
3583  for (int r = 0; r < numRows; ++r) {
3584  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
3585  }
3586  for (auto dict_id : dict_id_list) {
3587  sqliteConnector_.query_with_text_params(
3588  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
3589  std::vector<std::string>{std::to_string(dict_id)});
3590  }
3591  sqliteConnector_.query_with_text_params(
3592  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
3593  "mapd_columns where compression = ? "
3594  "and tableid = ?) and refcount = 0",
3595  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3596  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
3597  std::to_string(tableId));
3598  if (td->isView) {
3599  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
3600  std::to_string(tableId));
3601  }
3603  sqliteConnector_.query_with_text_param(
3604  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
3605  }
3606 }
3607 
3608 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
3609  cat_write_lock write_lock(this);
3610  cat_sqlite_lock sqlite_lock(getObjForLock());
3611 
3612  sqliteConnector_.query("BEGIN TRANSACTION");
3613  try {
3614  sqliteConnector_.query_with_text_params(
3615  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3616  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
3617  } catch (std::exception& e) {
3618  sqliteConnector_.query("ROLLBACK TRANSACTION");
3619  throw;
3620  }
3621  sqliteConnector_.query("END TRANSACTION");
3622  TableDescriptorMap::iterator tableDescIt =
3623  tableDescriptorMap_.find(to_upper(td->tableName));
3624  CHECK(tableDescIt != tableDescriptorMap_.end());
3625  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3626  // Get table descriptor to change it
3627  TableDescriptor* changeTd = tableDescIt->second;
3628  changeTd->tableName = newTableName;
3629  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3630  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3631  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3632 }
3633 
3634 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
3635  {
3636  cat_write_lock write_lock(this);
3637  cat_sqlite_lock sqlite_lock(getObjForLock());
3638  // rename all corresponding physical tables if this is a logical table
3639  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3640  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3641  const auto physicalTables = physicalTableIt->second;
3642  CHECK(!physicalTables.empty());
3643  for (size_t i = 0; i < physicalTables.size(); i++) {
3644  int32_t physical_tb_id = physicalTables[i];
3645  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3646  CHECK(phys_td);
3647  std::string newPhysTableName =
3648  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
3649  renamePhysicalTable(phys_td, newPhysTableName);
3650  }
3651  }
3652  renamePhysicalTable(td, newTableName);
3653  }
3654  {
3655  DBObject object(newTableName, TableDBObjectType);
3656  // update table name in direct and effective priv map
3657  DBObjectKey key;
3658  key.dbId = currentDB_.dbId;
3659  key.objectId = td->tableId;
3660  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3661  object.setObjectKey(key);
3662  auto objdescs = SysCatalog::instance().getMetadataForObject(
3663  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
3664  for (auto obj : objdescs) {
3665  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3666  if (grnt) {
3667  grnt->renameDbObject(object);
3668  }
3669  }
3670  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3671  }
3672 }
3673 
3674 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
3675  std::vector<int>& tableIds) {
3676  cat_write_lock write_lock(this);
3677  cat_sqlite_lock sqlite_lock(getObjForLock());
3678 
3679  // execute the SQL query
3680  try {
3681  for (size_t i = 0; i < names.size(); i++) {
3682  int tableId = tableIds[i];
3683  std::string& newTableName = names[i].second;
3684 
3685  sqliteConnector_.query_with_text_params(
3686  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3687  std::vector<std::string>{newTableName, std::to_string(tableId)});
3688  }
3689  } catch (std::exception& e) {
3690  sqliteConnector_.query("ROLLBACK TRANSACTION");
3691  throw;
3692  }
3693 
3694  // reset the table descriptors, give Calcite a kick
3695  for (size_t i = 0; i < names.size(); i++) {
3696  std::string& curTableName = names[i].first;
3697  std::string& newTableName = names[i].second;
3698 
3699  TableDescriptorMap::iterator tableDescIt =
3700  tableDescriptorMap_.find(to_upper(curTableName));
3701  CHECK(tableDescIt != tableDescriptorMap_.end());
3702  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3703 
3704  // Get table descriptor to change it
3705  TableDescriptor* changeTd = tableDescIt->second;
3706  changeTd->tableName = newTableName;
3707  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3708  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3709  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3710  }
3711 }
3712 
3713 // Collect an 'overlay' mapping of the tableNames->tableId
3714 // to account for possible chained renames
3715 // (for swap: a->b, b->c, c->d, d->a)
3716 
3718  std::map<std::string, int>& cachedTableMap,
3719  std::string& curTableName) {
3720  auto iter = cachedTableMap.find(curTableName);
3721  if ((iter != cachedTableMap.end())) {
3722  // get the cached tableId
3723  // and use that to lookup the TableDescriptor
3724  int tableId = (*iter).second;
3725  if (tableId == -1) {
3726  return NULL;
3727  } else {
3728  return cat->getMetadataForTable(tableId);
3729  }
3730  }
3731 
3732  // else ... lookup in standard location
3733  return cat->getMetadataForTable(curTableName);
3734 }
3735 
3736 void replaceTableName(std::map<std::string, int>& cachedTableMap,
3737  std::string& curTableName,
3738  std::string& newTableName,
3739  int tableId) {
3740  // mark old/cur name as deleted
3741  cachedTableMap[curTableName] = -1;
3742 
3743  // insert the 'new' name
3744  cachedTableMap[newTableName] = tableId;
3745 }
3746 
3747 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
3748  // tableId of all tables being renamed
3749  // ... in matching order to 'names'
3750  std::vector<int> tableIds;
3751 
3752  // (sorted & unique) list of tables ids for locking
3753  // (with names index of src in case of error)
3754  // <tableId, strIndex>
3755  // std::map is by definition/implementation sorted
3756  // std::map current usage below tests to avoid over-write
3757  std::map<int, size_t> uniqueOrderedTableIds;
3758 
3759  // mapping of modified tables names -> tableId
3760  std::map<std::string, int> cachedTableMap;
3761 
3762  // -------- Setup --------
3763 
3764  // gather tableIds pre-execute; build maps
3765  for (size_t i = 0; i < names.size(); i++) {
3766  std::string& curTableName = names[i].first;
3767  std::string& newTableName = names[i].second;
3768 
3769  // make sure the table being renamed exists,
3770  // or will exist when executed in 'name' order
3771  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
3772  CHECK(td);
3773 
3774  tableIds.push_back(td->tableId);
3775  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
3776  // don't overwrite as it should map to the first names index 'i'
3777  uniqueOrderedTableIds[td->tableId] = i;
3778  }
3779  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
3780  }
3781 
3782  CHECK_EQ(tableIds.size(), names.size());
3783 
3784  // The outer Stmt created a write lock before calling the catalog rename table
3785  // -> TODO: might want to sort out which really should set the lock :
3786  // the comment in the outer scope indicates it should be in here
3787  // but it's not clear if the access done there *requires* it out there
3788  //
3789  // Lock tables pre-execute (may/will be in different order than rename occurs)
3790  // const auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
3791  // *legacylockmgr::LockMgr<mapd_shared_mutex, bool>::getMutex(
3792  // legacylockmgr::ExecutorOuterLock, true));
3793 
3794  // acquire the locks for all tables being renamed
3796  for (auto& idPair : uniqueOrderedTableIds) {
3797  std::string& tableName = names[idPair.second].first;
3798  tableLocks.emplace_back(
3801  *this, tableName, false)));
3802  }
3803 
3804  // -------- Rename --------
3805 
3806  {
3807  cat_write_lock write_lock(this);
3808  cat_sqlite_lock sqlite_lock(getObjForLock());
3809 
3810  sqliteConnector_.query("BEGIN TRANSACTION");
3811 
3812  // collect all (tables + physical tables) into a single list
3813  std::vector<std::pair<std::string, std::string>> allNames;
3814  std::vector<int> allTableIds;
3815 
3816  for (size_t i = 0; i < names.size(); i++) {
3817  int tableId = tableIds[i];
3818  std::string& curTableName = names[i].first;
3819  std::string& newTableName = names[i].second;
3820 
3821  // rename all corresponding physical tables if this is a logical table
3822  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
3823  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3824  const auto physicalTables = physicalTableIt->second;
3825  CHECK(!physicalTables.empty());
3826  for (size_t k = 0; k < physicalTables.size(); k++) {
3827  int32_t physical_tb_id = physicalTables[k];
3828  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3829  CHECK(phys_td);
3830  std::string newPhysTableName =
3831  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
3832  allNames.emplace_back(phys_td->tableName, newPhysTableName);
3833  allTableIds.push_back(phys_td->tableId);
3834  }
3835  }
3836  allNames.emplace_back(curTableName, newTableName);
3837  allTableIds.push_back(tableId);
3838  }
3839  // rename all tables in one shot
3840  renamePhysicalTable(allNames, allTableIds);
3841 
3842  sqliteConnector_.query("END TRANSACTION");
3843  // cat write/sqlite locks are released when they go out scope
3844  }
3845  {
3846  // now update the SysCatalog
3847  for (size_t i = 0; i < names.size(); i++) {
3848  int tableId = tableIds[i];
3849  std::string& newTableName = names[i].second;
3850  {
3851  // update table name in direct and effective priv map
3852  DBObjectKey key;
3853  key.dbId = currentDB_.dbId;
3854  key.objectId = tableId;
3855  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3856 
3857  DBObject object(newTableName, TableDBObjectType);
3858  object.setObjectKey(key);
3859 
3860  auto objdescs = SysCatalog::instance().getMetadataForObject(
3861  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
3862  for (auto obj : objdescs) {
3863  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3864  if (grnt) {
3865  grnt->renameDbObject(object);
3866  }
3867  }
3868  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3869  }
3870  }
3871  }
3872 
3873  // -------- Cleanup --------
3874 
3875  // table locks are released when 'tableLocks' goes out of scope
3876 }
3877 
3878 void Catalog::renameColumn(const TableDescriptor* td,
3879  const ColumnDescriptor* cd,
3880  const string& newColumnName) {
3881  cat_write_lock write_lock(this);
3882  cat_sqlite_lock sqlite_lock(getObjForLock());
3883  sqliteConnector_.query("BEGIN TRANSACTION");
3884  try {
3885  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3886  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3887  CHECK(cdx);
3888  std::string new_column_name = cdx->columnName;
3889  new_column_name.replace(0, cd->columnName.size(), newColumnName);
3890  sqliteConnector_.query_with_text_params(
3891  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
3892  std::vector<std::string>{new_column_name,
3893  std::to_string(td->tableId),
3894  std::to_string(cdx->columnId)});
3895  }
3896  } catch (std::exception& e) {
3897  sqliteConnector_.query("ROLLBACK TRANSACTION");
3898  throw;
3899  }
3900  sqliteConnector_.query("END TRANSACTION");
3901  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3902  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3903  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3904  CHECK(cdx);
3905  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
3906  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
3907  CHECK(columnDescIt != columnDescriptorMap_.end());
3908  ColumnDescriptor* changeCd = columnDescIt->second;
3909  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
3910  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
3911  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
3912  changeCd;
3913  }
3914  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3915 }
3916 
3917 int32_t Catalog::createDashboard(DashboardDescriptor& vd,
3918  bool skip_system_role_creation) {
3919  cat_write_lock write_lock(this);
3920  cat_sqlite_lock sqlite_lock(getObjForLock());
3921  sqliteConnector_.query("BEGIN TRANSACTION");
3922  try {
3923  // TODO(andrew): this should be an upsert
3924  sqliteConnector_.query_with_text_params(
3925  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
3926  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3927  if (sqliteConnector_.getNumRows() > 0) {
3928  sqliteConnector_.query_with_text_params(
3929  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
3930  "update_time = "
3931  "datetime('now') where name = ? "
3932  "and userid = ?",
3933  std::vector<std::string>{vd.dashboardState,
3934  vd.imageHash,
3935  vd.dashboardMetadata,
3936  vd.dashboardName,
3937  std::to_string(vd.userId)});
3938  } else {
3939  sqliteConnector_.query_with_text_params(
3940  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
3941  "update_time, "
3942  "userid) "
3943  "VALUES "
3944  "(?,?,?,?, "
3945  "datetime('now'), ?)",
3946  std::vector<std::string>{vd.dashboardName,
3947  vd.dashboardState,
3948  vd.imageHash,
3949  vd.dashboardMetadata,
3950  std::to_string(vd.userId)});
3951  }
3952  } catch (std::exception& e) {
3953  sqliteConnector_.query("ROLLBACK TRANSACTION");
3954  throw;
3955  }
3956  sqliteConnector_.query("END TRANSACTION");
3957 
3958  // now get the auto generated dashboardId
3959  try {
3960  sqliteConnector_.query_with_text_params(
3961  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
3962  "WHERE name = ? and userid = ?",
3963  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3964  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
3965  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
3966  } catch (std::exception& e) {
3967  throw;
3968  }
3970  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
3971  addFrontendViewToMap(vd);
3972  sqlite_lock.unlock();
3973  write_lock.unlock();
3974  if (!skip_system_role_creation) {
3975  // NOTE(wamsi): Transactionally unsafe
3976  createOrUpdateDashboardSystemRole(
3978  }
3979  return vd.dashboardId;
3980 }
3981 
3982 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
3983  cat_write_lock write_lock(this);
3984  cat_sqlite_lock sqlite_lock(getObjForLock());
3985 
3986  CHECK(sqliteConnector_.getSqlitePtr());
3987  sqliteConnector_.query("BEGIN TRANSACTION");
3988  try {
3989  sqliteConnector_.query_with_text_params(
3990  "SELECT id FROM mapd_dashboards WHERE id = ?",
3991  std::vector<std::string>{std::to_string(vd.dashboardId)});
3992  if (sqliteConnector_.getNumRows() > 0) {
3993  sqliteConnector_.query_with_text_params(
3994  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
3995  "?, userid = ?, update_time = datetime('now') where id = ? ",
3996  std::vector<std::string>{vd.dashboardName,
3997  vd.dashboardState,
3998  vd.imageHash,
3999  vd.dashboardMetadata,
4000  std::to_string(vd.userId),
4002  } else {
4003  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4004  << " does not exist in db";
4005  throw runtime_error("Error replacing dashboard id " +
4006  std::to_string(vd.dashboardId) + " does not exist in db");
4007  }
4008  } catch (std::exception& e) {
4009  sqliteConnector_.query("ROLLBACK TRANSACTION");
4010  throw;
4011  }
4012  sqliteConnector_.query("END TRANSACTION");
4013 
4014  bool found{false};
4015  for (auto descp : dashboardDescriptorMap_) {
4016  auto dash = descp.second.get();
4017  if (dash->dashboardId == vd.dashboardId) {
4018  found = true;
4019  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4020  dash->dashboardName);
4021  if (viewDescIt ==
4022  dashboardDescriptorMap_.end()) { // check to make sure view exists
4023  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4024  << " dashboard " << dash->dashboardName << " does not exist in map";
4025  throw runtime_error("No metadata for dashboard for user " +
4026  std::to_string(dash->userId) + " dashboard " +
4027  dash->dashboardName + " does not exist in map");
4028  }
4029  dashboardDescriptorMap_.erase(viewDescIt);
4030  break;
4031  }
4032  }
4033  if (!found) {
4034  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4035  << " does not exist in map";
4036  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4037  " does not exist in map");
4038  }
4039 
4040  // now reload the object
4041  sqliteConnector_.query_with_text_params(
4042  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4043  "mapd_dashboards "
4044  "WHERE id = ?",
4045  std::vector<std::string>{std::to_string(vd.dashboardId)});
4046  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4048  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4049  addFrontendViewToMapNoLock(vd);
4050  sqlite_lock.unlock();
4051  write_lock.unlock();
4052  // NOTE(wamsi): Transactionally unsafe
4053  createOrUpdateDashboardSystemRole(
4055 }
4056 
4057 std::string Catalog::calculateSHA1(const std::string& data) {
4058  boost::uuids::detail::sha1 sha1;
4059  unsigned int digest[5];
4060  sha1.process_bytes(data.c_str(), data.length());
4061  sha1.get_digest(digest);
4062  std::stringstream ss;
4063  for (size_t i = 0; i < 5; i++) {
4064  ss << std::hex << digest[i];
4065  }
4066  return ss.str();
4067 }
4068 
4069 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4070  cat_write_lock write_lock(this);
4071  cat_sqlite_lock sqlite_lock(getObjForLock());
4072  sqliteConnector_.query("BEGIN TRANSACTION");
4073  try {
4074  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4075  .substr(0, 8);
4076  sqliteConnector_.query_with_text_params(
4077  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4078  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4079  if (sqliteConnector_.getNumRows() > 0) {
4080  sqliteConnector_.query_with_text_params(
4081  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4082  "link = ?",
4083  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4084  } else {
4085  sqliteConnector_.query_with_text_params(
4086  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4087  "update_time) VALUES (?,?,?,?, datetime('now'))",
4088  std::vector<std::string>{
4089  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4090  }
4091  // now get the auto generated dashid
4092  sqliteConnector_.query_with_text_param(
4093  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4094  "WHERE link = ?",
4095  ld.link);
4096  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4097  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4098  } catch (std::exception& e) {
4099  sqliteConnector_.query("ROLLBACK TRANSACTION");
4100  throw;
4101  }
4102  sqliteConnector_.query("END TRANSACTION");
4103  addLinkToMap(ld);
4104  return ld.link;
4105 }
4106 
4107 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4108  const TableDescriptor* td) const {
4109  cat_read_lock read_lock(this);
4110 
4111  const auto column_descriptors =
4112  getAllColumnMetadataForTable(td->tableId, false, true, true);
4113 
4114  const ColumnDescriptor* shard_cd{nullptr};
4115  int i = 1;
4116  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4117  ++cd_itr, ++i) {
4118  if (i == td->shardedColumnId) {
4119  shard_cd = *cd_itr;
4120  }
4121  }
4122  return shard_cd;
4123 }
4124 
4125 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4126  const TableDescriptor* logical_table_desc,
4127  bool populate_fragmenter) const {
4128  cat_read_lock read_lock(this);
4129  const auto physicalTableIt =
4130  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4131  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4132  return {logical_table_desc};
4133  }
4134  const auto physicalTablesIds = physicalTableIt->second;
4135  CHECK(!physicalTablesIds.empty());
4136  read_lock.unlock();
4137  std::vector<const TableDescriptor*> physicalTables;
4138  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4139  physicalTables.push_back(
4140  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4141  }
4142  return physicalTables;
4143 }
4144 
4145 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4146  const {
4147  cat_read_lock read_lock(this);
4148  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4149  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4150  for (const auto [table_id, td] : tableDescriptorMapById_) {
4151  // Only include ids for physical persisted tables
4152  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4153  logicalToPhysicalTableMapById_.find(table_id) ==
4154  logicalToPhysicalTableMapById_.end()) {
4155  table_and_shard_ids.emplace_back(table_id, td->shard);
4156  }
4157  }
4158  return table_and_shard_ids;
4159 }
4160 
4161 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4162  cat_read_lock read_lock(this);
4163 
4164  std::map<int, const ColumnDescriptor*> mapping;
4165 
4166  const auto tables = getAllTableMetadata();
4167  for (const auto td : tables) {
4168  if (td->shard >= 0) {
4169  // skip shards, they're not standalone tables
4170  continue;
4171  }
4172 
4173  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4174  const auto& ti = cd->columnType;
4175  if (ti.is_string()) {
4176  if (ti.get_compression() == kENCODING_DICT) {
4177  // if foreign reference, get referenced tab.col
4178  const auto dict_id = ti.get_comp_param();
4179 
4180  // ignore temp (negative) dictionaries
4181  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4182  mapping[dict_id] = cd;
4183  }
4184  }
4185  }
4186  }
4187  }
4188 
4189  return mapping;
4190 }
4191 
4192 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4193  const UserMetadata& user_metadata,
4194  const GetTablesType get_tables_type) const {
4195  if (td->shard >= 0) {
4196  // skip shards, they're not standalone tables
4197  return false;
4198  }
4199  switch (get_tables_type) {
4200  case GET_PHYSICAL_TABLES: {
4201  if (td->isView) {
4202  return false;
4203  }
4204  break;
4205  }
4206  case GET_VIEWS: {
4207  if (!td->isView) {
4208  return false;
4209  }
4210  break;
4211  }
4212  default:
4213  break;
4214  }
4216  dbObject.loadKey(*this);
4217  std::vector<DBObject> privObjects = {dbObject};
4218  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4219  // skip table, as there are no privileges to access it
4220  return false;
4221  }
4222  return true;
4223 }
4224 
4225 std::vector<std::string> Catalog::getTableNamesForUser(
4226  const UserMetadata& user_metadata,
4227  const GetTablesType get_tables_type) const {
4228  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4229  cat_read_lock read_lock(this);
4230  std::vector<std::string> table_names;
4231  const auto tables = getAllTableMetadata();
4232  for (const auto td : tables) {
4233  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4234  table_names.push_back(td->tableName);
4235  }
4236  }
4237  return table_names;
4238 }
4239 
4240 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4241  const UserMetadata& user_metadata,
4242  const GetTablesType get_tables_type,
4243  const std::string& filter_table_name) const {
4244  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4245  cat_read_lock read_lock(this);
4246 
4247  std::vector<TableMetadata> tables_metadata;
4248  const auto tables = getAllTableMetadata();
4249  for (const auto td : tables) {
4250  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4251  if (!filter_table_name.empty()) {
4252  if (td->tableName != filter_table_name) {
4253  continue;
4254  }
4255  }
4256  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4257  // descriptor outside catalog lock
4258  tables_metadata.emplace_back(table_metadata);
4259  }
4260  }
4261  return tables_metadata;
4262 }
4263 
4264 int Catalog::getLogicalTableId(const int physicalTableId) const {
4265  cat_read_lock read_lock(this);
4266  for (const auto& l : logicalToPhysicalTableMapById_) {
4267  if (l.second.end() != std::find_if(l.second.begin(),
4268  l.second.end(),
4269  [&](decltype(*l.second.begin()) tid) -> bool {
4270  return physicalTableId == tid;
4271  })) {
4272  return l.first;
4273  }
4274  }
4275  return physicalTableId;
4276 }
4277 
4278 void Catalog::checkpoint(const int logicalTableId) const {
4279  const auto td = getMetadataForTable(logicalTableId);
4280  const auto shards = getPhysicalTablesDescriptors(td);
4281  for (const auto shard : shards) {
4282  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4283  }
4284 }
4285 
4286 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4287  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4288  try {
4289  checkpoint(logical_table_id);
4290  } catch (...) {
4291  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4292  throw;
4293  }
4294 }
4295 
4296 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
4297  cat_read_lock read_lock(this);
4298  const auto td = getMetadataForTable(logicalTableId, false);
4299  const auto shards = getPhysicalTablesDescriptors(td, false);
4300  for (const auto shard : shards) {
4301  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
4302  }
4303 }
4304 
4305 void Catalog::eraseDbMetadata() {
4306  const auto tables = getAllTableMetadata();
4307  for (const auto table : tables) {
4308  eraseTableMetadata(table);
4309  }
4310  // Physically erase database metadata
4311  boost::filesystem::remove(basePath_ + "/mapd_catalogs/" + currentDB_.dbName);
4312  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4313 }
4314 
4315 void Catalog::eraseDbPhysicalData() {
4316  const auto tables = getAllTableMetadata();
4317  for (const auto table : tables) {
4318  eraseTablePhysicalData(table);
4319  }
4320 }
4321 
4322 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4323  const int tableId = td->tableId;
4324  // must destroy fragmenter before deleteChunks is called.
4325  removeFragmenterForTable(tableId);
4326 
4327  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4328  {
4329  INJECT_TIMER(deleteChunksWithPrefix);
4330  // assuming deleteChunksWithPrefix is atomic
4331  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4332  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4333  }
4334  if (!td->isView) {
4335  INJECT_TIMER(Remove_Table);
4336  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4337  }
4338 }
4339 
4340 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4341  const int32_t& shardNumber) {
4342  std::string physicalTableName =
4343  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4344  return (physicalTableName);
4345 }
4346 
4347 void Catalog::buildForeignServerMap() {
4348  sqliteConnector_.query(
4349  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4350  "omnisci_foreign_servers");
4351  auto num_rows = sqliteConnector_.getNumRows();
4352  for (size_t row = 0; row < num_rows; row++) {
4353  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4354  sqliteConnector_.getData<int>(row, 0),
4355  sqliteConnector_.getData<std::string>(row, 1),
4356  sqliteConnector_.getData<std::string>(row, 2),
4357  sqliteConnector_.getData<std::string>(row, 3),
4358  sqliteConnector_.getData<std::int32_t>(row, 4),
4359  sqliteConnector_.getData<std::int32_t>(row, 5));
4360  foreignServerMap_[foreign_server->name] = foreign_server;
4361  foreignServerMapById_[foreign_server->id] = foreign_server;
4362  }
4363 }
4364 
4365 void Catalog::addForeignTableDetails() {
4366  sqliteConnector_.query(
4367  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4368  "omnisci_foreign_tables");
4369  auto num_rows = sqliteConnector_.getNumRows();
4370  for (size_t r = 0; r < num_rows; r++) {
4371  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4372  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4373  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4374  const auto last_refresh_time = sqliteConnector_.getData<int>(r, 3);
4375  const auto next_refresh_time = sqliteConnector_.getData<int>(r, 4);
4376 
4377  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4378  auto foreign_table =
4379  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4380  CHECK(foreign_table);
4381  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4382  CHECK(foreign_table->foreign_server);
4383  foreign_table->populateOptionsMap(options);
4384  foreign_table->last_refresh_time = last_refresh_time;
4385  foreign_table->next_refresh_time = next_refresh_time;
4386  }
4387 }
4388 
4389 void Catalog::setForeignServerProperty(const std::string& server_name,
4390  const std::string& property,
4391  const std::string& value) {
4392  cat_sqlite_lock sqlite_lock(getObjForLock());
4393  sqliteConnector_.query_with_text_params(
4394  "SELECT id from omnisci_foreign_servers where name = ?",
4395  std::vector<std::string>{server_name});
4396  auto num_rows = sqliteConnector_.getNumRows();
4397  if (num_rows > 0) {
4398  CHECK_EQ(size_t(1), num_rows);
4399  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4400  sqliteConnector_.query_with_text_params(
4401  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4402  std::vector<std::string>{value, std::to_string(server_id)});
4403  } else {
4404  throw std::runtime_error{"Can not change property \"" + property +
4405  "\" for foreign server." + " Foreign server \"" +
4406  server_name + "\" is not found."};
4407  }
4408 }
4409 
4410 void Catalog::createDefaultServersIfNotExists() {
4414 
4415  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4416  "omnisci_local_csv",
4418  options,
4420  local_csv_server->validate();
4421  createForeignServerNoLocks(std::move(local_csv_server), true);
4422 
4423 #ifdef ENABLE_IMPORT_PARQUET
4424  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4425  "omnisci_local_parquet",
4427  options,
4429  local_parquet_server->validate();
4430  createForeignServerNoLocks(std::move(local_parquet_server), true);
4431 #endif
4432 
4433  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
4434  "omnisci_local_regex_parser",
4436  options,
4438  local_regex_parser_server->validate();
4439  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
4440 }
4441 
4442 // prepare a fresh file reload on next table access
4443 void Catalog::setForReload(const int32_t tableId) {
4444  const auto td = getMetadataForTable(tableId);
4445  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4446  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4447  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4448  }
4449 }
4450 
4451 // get a table's data dirs
4452 std::vector<std::string> Catalog::getTableDataDirectories(
4453  const TableDescriptor* td) const {
4454  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4455  std::vector<std::string> file_paths;
4456  for (auto shard : getPhysicalTablesDescriptors(td)) {
4457  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4458  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4459  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4460  file_paths.push_back(file_path.filename().string());
4461  }
4462  return file_paths;
4463 }
4464 
4465 // get a column's dict dir basename
4466 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
4467  bool file_name_only) const {
4468  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4470  cd->columnType.get_comp_param() > 0) {
4471  const auto dictId = cd->columnType.get_comp_param();
4472  const DictRef dictRef(currentDB_.dbId, dictId);
4473  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4474  CHECK(dit != dictDescriptorMapByRef_.end());
4475  CHECK(dit->second);
4476  if (file_name_only) {
4477  boost::filesystem::path file_path(dit->second->dictFolderPath);
4478  return file_path.filename().string();
4479  } else {
4480  return dit->second->dictFolderPath;
4481  }
4482  }
4483  return std::string();
4484 }
4485 
4486 // get a table's dict dirs
4487 std::vector<std::string> Catalog::getTableDictDirectories(
4488  const TableDescriptor* td) const {
4489  std::vector<std::string> file_paths;
4490  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4491  auto file_base = getColumnDictDirectory(cd);
4492  if (!file_base.empty() &&
4493  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4494  file_paths.push_back(file_base);
4495  }
4496  }
4497  return file_paths;
4498 }
4499 
4500 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id) const {
4501  cat_read_lock read_lock(this);
4502  std::set<std::string> directory_paths;
4503  for (auto cd : getAllColumnMetadataForTable(table_id, false, false, true)) {
4504  auto directory_path = getColumnDictDirectory(cd, false);
4505  if (!directory_path.empty()) {
4506  directory_paths.emplace(directory_path);
4507  }
4508  }
4509  return directory_paths;
4510 }
4511 
4512 // returns table schema in a string
4513 // NOTE(sy): Might be able to replace dumpSchema() later with
4514 // dumpCreateTable() after a deeper review of the TableArchiver code.
4515 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
4516  CHECK(!td->is_system_table);
4517  cat_read_lock read_lock(this);
4518 
4519  std::ostringstream os;
4520  os << "CREATE TABLE @T (";
4521  // gather column defines
4522  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4523  std::string comma;
4524  std::vector<std::string> shared_dicts;
4525  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4526  for (const auto cd : cds) {
4527  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4528  const auto& ti = cd->columnType;
4529  os << comma << cd->columnName;
4530  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4531  if (ti.get_type() == SQLTypes::kCHAR) {
4532  os << " "
4533  << "TEXT";
4534  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4535  os << " "
4536  << "TEXT[]";
4537  } else {
4538  os << " " << ti.get_type_name();
4539  }
4540  os << (ti.get_notnull() ? " NOT NULL" : "");
4541  if (cd->default_value.has_value()) {
4542  os << " DEFAULT " << cd->getDefaultValueLiteral();
4543  }
4544  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4545  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4546  if (ti.get_compression() == kENCODING_DICT) {
4547  // if foreign reference, get referenced tab.col
4548  const auto dict_id = ti.get_comp_param();
4549  const DictRef dict_ref(currentDB_.dbId, dict_id);
4550  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4551  CHECK(dict_it != dictDescriptorMapByRef_.end());
4552  const auto dict_name = dict_it->second->dictName;
4553  // when migrating a table, any foreign dict ref will be dropped
4554  // and the first cd of a dict will become root of the dict
4555  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
4556  dict_root_cds[dict_name] = cd;
4557  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4558  } else {
4559  const auto dict_root_cd = dict_root_cds[dict_name];
4560  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
4561  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
4562  // "... shouldn't specify an encoding, it borrows from the referenced
4563  // column"
4564  }
4565  } else {
4566  os << " ENCODING NONE";
4567  }
4568  } else if (ti.is_date_in_days() ||
4569  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4570  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4571  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4572  } else if (ti.is_geometry()) {
4573  if (ti.get_compression() == kENCODING_GEOINT) {
4574  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4575  << ")";
4576  } else {
4577  os << " ENCODING NONE";
4578  }
4579  }
4580  comma = ", ";
4581  }
4582  }
4583  // gather SHARED DICTIONARYs
4584  if (shared_dicts.size()) {
4585  os << ", " << boost::algorithm::join(shared_dicts, ", ");
4586  }
4587  // gather WITH options ...
4588  std::vector<std::string> with_options;
4589  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4590  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4591  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4592  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4593  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4594  : "VACUUM='IMMEDIATE'");
4595  if (!td->partitions.empty()) {
4596  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4597  }
4598  if (td->nShards > 0) {
4599  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4600  CHECK(shard_cd);
4601  os << ", SHARD KEY(" << shard_cd->columnName << ")";
4602  with_options.push_back(
4603  "SHARD_COUNT=" +
4604  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4605  }
4606  if (td->sortedColumnId > 0) {
4607  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4608  CHECK(sort_cd);
4609  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4610  }
4612  td->maxRollbackEpochs != -1) {
4613  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4615  }
4616  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
4617  return os.str();
4618 }
4619 
4620 #include "Parser/ReservedKeywords.h"
4621 
4623 inline bool contains_spaces(std::string_view str) {
4624  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
4625  return std::isspace(ch);
4626  }) != str.end();
4627 }
4628 
4631  std::string_view str,
4632  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
4633  return str.find_first_of(chars) != std::string_view::npos;
4634 }
4635 
4637 inline bool is_reserved_sql_keyword(std::string_view str) {
4638  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
4639 }
4640 
4641 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
4642 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
4643  bool multiline_formatting,
4644  bool dump_defaults) const {
4645  cat_read_lock read_lock(this);
4646 
4647  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
4648  std::ostringstream os;
4649 
4650  if (foreign_table && !td->is_system_table) {
4651  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
4652  } else if (!td->isView) {
4653  os << "CREATE ";
4655  os << "TEMPORARY ";
4656  }
4657  os << "TABLE " + td->tableName + " (";
4658  } else {
4659  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
4660  return os.str();
4661  }
4662  // scan column defines
4663  std::vector<std::string> additional_info;
4664  std::set<std::string> shared_dict_column_names;
4665 
4666  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
4667 
4668  // gather column defines
4669  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4670  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4671  bool first = true;
4672  for (const auto cd : cds) {
4673  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4674  const auto& ti = cd->columnType;
4675  if (!first) {
4676  os << ",";
4677  if (!multiline_formatting) {
4678  os << " ";
4679  }
4680  } else {
4681  first = false;
4682  }
4683  if (multiline_formatting) {
4684  os << "\n ";
4685  }
4686  // column name
4687  os << quoteIfRequired(cd->columnName);
4688  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4689  if (ti.get_type() == SQLTypes::kCHAR) {
4690  os << " "
4691  << "TEXT";
4692  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4693  os << " "
4694  << "TEXT[]";
4695  } else {
4696  os << " " << ti.get_type_name();
4697  }
4698  os << (ti.get_notnull() ? " NOT NULL" : "");
4699  if (cd->default_value.has_value()) {
4700  os << " DEFAULT " << cd->getDefaultValueLiteral();
4701  }
4702  if (shared_dict_column_names.find(cd->columnName) ==
4703  shared_dict_column_names.end()) {
4704  // avoids "Column ... shouldn't specify an encoding, it borrows it
4705  // from the referenced column"
4706  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4707  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4708  if (ti.get_compression() == kENCODING_DICT) {
4709  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4710  } else {
4711  os << " ENCODING NONE";
4712  }
4713  } else if (ti.is_date_in_days() ||
4714  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4715  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4716  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4717  } else if (ti.is_geometry()) {
4718  if (ti.get_compression() == kENCODING_GEOINT) {
4719  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4720  << ")";
4721  } else {
4722  os << " ENCODING NONE";
4723  }
4724  }
4725  }
4726  }
4727  }
4728  // gather SHARED DICTIONARYs
4729  if (additional_info.size()) {
4730  std::string comma;
4731  if (!multiline_formatting) {
4732  comma = ", ";
4733  } else {
4734  comma = ",\n ";
4735  }
4736  os << comma;
4737  os << boost::algorithm::join(additional_info, comma);
4738  }
4739  os << ")";
4740 
4741  std::vector<std::string> with_options;
4742  if (foreign_table && !td->is_system_table) {
4743  if (multiline_formatting) {
4744  os << "\n";
4745  } else {
4746  os << " ";
4747  }
4748  os << "SERVER " << foreign_table->foreign_server->name;
4749 
4750  // gather WITH options ...
4751  for (const auto& [option, value] : foreign_table->options) {
4752  with_options.emplace_back(option + "='" + value + "'");
4753  }
4754  }
4755 
4756  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
4757  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4758  }
4759  if (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE) {
4760  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4761  }
4762  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
4763  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4764  }
4765  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
4766  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4767  }
4768  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
4769  td->maxRollbackEpochs != -1) {
4770  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4772  }
4773  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
4774  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4775  : "VACUUM='IMMEDIATE'");
4776  }
4777  if (!foreign_table && !td->partitions.empty()) {
4778  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4779  }
4780  if (!foreign_table && td->nShards > 0) {
4781  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4782  CHECK(shard_cd);
4783  with_options.push_back(
4784  "SHARD_COUNT=" +
4785  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4786  }
4787  if (!foreign_table && td->sortedColumnId > 0) {
4788  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4789  CHECK(sort_cd);
4790  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4791  }
4792 
4793  if (!with_options.empty()) {
4794  if (!multiline_formatting) {
4795  os << " ";
4796  } else {
4797  os << "\n";
4798  }
4799  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
4800  }
4801  os << ";";
4802  return os.str();
4803 }
4804 
4805 bool Catalog::validateNonExistentTableOrView(const std::string& name,
4806  const bool if_not_exists) {
4807  if (getMetadataForTable(name, false)) {
4808  if (if_not_exists) {
4809  return false;
4810  }
4811  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
4812  }
4813  return true;
4814 }
4815 
4816 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
4817  cat_read_lock read_lock(this);
4818  std::vector<const TableDescriptor*> tables;
4819  for (auto entry : tableDescriptorMapById_) {
4820  auto table_descriptor = entry.second;
4821  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
4822  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4823  CHECK(foreign_table);
4824  auto timing_type_entry = foreign_table->options.find(
4826  CHECK(timing_type_entry != foreign_table->options.end());
4828  if (timing_type_entry->second ==
4830  foreign_table->next_refresh_time <= current_time) {
4831  tables.emplace_back(foreign_table);
4832  }
4833  }
4834  }
4835  return tables;
4836 }
4837 
4838 void Catalog::updateForeignTableRefreshTimes(const int32_t table_id) {
4839  cat_write_lock write_lock(this);
4840  cat_sqlite_lock sqlite_lock(getObjForLock());
4841  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4842  auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
4843  CHECK(table_descriptor);
4844  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4845  CHECK(foreign_table);
4846  auto last_refresh_time = foreign_storage::RefreshTimeCalculator::getCurrentTime();
4847  auto next_refresh_time = get_next_refresh_time(*foreign_table);
4848  sqliteConnector_.query_with_text_params(
4849  "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
4850  "WHERE table_id = ?",
4851  std::vector<std::string>{std::to_string(last_refresh_time),
4852  std::to_string(next_refresh_time),
4853  std::to_string(foreign_table->tableId)});
4854  foreign_table->last_refresh_time = last_refresh_time;
4855  foreign_table->next_refresh_time = next_refresh_time;
4856 }
4857 
4858 // TODO(Misiu): This function should be merged with setForeignServerOptions via
4859 // inheritance rather than replication similar functions.
4860 void Catalog::setForeignTableOptions(const std::string& table_name,
4861  foreign_storage::OptionsMap& options_map,
4862  bool clear_existing_options) {
4863  cat_write_lock write_lock(this);
4864  // update in-memory table
4865  auto foreign_table = getForeignTableUnlocked(table_name);
4866  auto saved_options = foreign_table->options;
4867  foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
4868  try {
4869  foreign_table->validateOptionValues();
4870  } catch (const std::exception& e) {
4871  // validation did not succeed:
4872  // revert to saved options & throw exception
4873  foreign_table->options = saved_options;
4874  throw;
4875  }
4876  setForeignTableProperty(
4877  foreign_table, "options", foreign_table->getOptionsAsJsonString());
4878 }
4879 
4880 void Catalog::setForeignTableProperty(const foreign_storage::ForeignTable* table,
4881  const std::string& property,
4882  const std::string& value) {
4883  cat_sqlite_lock sqlite_lock(getObjForLock());
4884  sqliteConnector_.query_with_text_params(
4885  "SELECT table_id from omnisci_foreign_tables where table_id = ?",
4886  std::vector<std::string>{std::to_string(table->tableId)});
4887  auto num_rows = sqliteConnector_.getNumRows();
4888  if (num_rows > 0) {
4889  CHECK_EQ(size_t(1), num_rows);
4890  sqliteConnector_.query_with_text_params(
4891  "UPDATE omnisci_foreign_tables SET " + property + " = ? WHERE table_id = ?",
4892  std::vector<std::string>{value, std::to_string(table->tableId)});
4893  } else {
4894  throw std::runtime_error{"Can not change property \"" + property +
4895  "\" for foreign table." + " Foreign table \"" +
4896  table->tableName + "\" is not found."};
4897  }
4898 }
4899 
4900 std::string Catalog::quoteIfRequired(const std::string& column_name) const {
4901  if (is_reserved_sql_keyword(column_name) || contains_spaces(column_name) ||
4902  contains_sql_reserved_chars(column_name)) {
4903  return get_quoted_string(column_name, '"', '"');
4904  } else {
4905  return column_name;
4906  }
4907 }
4908 
4909 // this will gather information that represents the shared dictionary columns
4910 // as they are on the table NOW not at original creation
4911 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
4912  std::set<std::string>& shared_dict_column_names,
4913  const TableDescriptor* td) const {
4914  if (td->nShards > 0) {
4915  ColumnIdKey columnIdKey(td->tableId, td->shardedColumnId);
4916  auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
4917  CHECK(scd);
4918  std::string txt = "SHARD KEY (" + quoteIfRequired(scd->columnName) + ")";
4919  additional_info.emplace_back(txt);
4920  }
4921  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4922  for (const auto cd : cds) {
4923  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4924  const SQLTypeInfo& ti = cd->columnType;
4925  if (ti.get_compression() != kENCODING_DICT) {
4926  continue;
4927  }
4928  auto dictId = ti.get_comp_param();
4929 
4930  // now we need to check how many other users have this dictionary
4931 
4932  DictRef dict_ref(currentDB_.dbId, dictId);
4933  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4934  if (dictIt == dictDescriptorMapByRef_.end()) {
4935  LOG(ERROR) << "missing dictionary " << dictId << " for table " << td->tableName;
4936  continue;
4937  }
4938 
4939  const auto& dd = dictIt->second;
4940  if (dd->refcount > 1) {
4941  auto lowest_table = td->tableId;
4942  auto lowest_column = cd->columnId;
4943  std::string lowest_column_name;
4944  // we have multiple tables using this dictionary
4945  // find the other occurances and keep the "lowest"
4946  for (auto const& [key, val] : columnDescriptorMap_) {
4947  if (val->columnType.get_compression() == kENCODING_DICT &&
4948  val->columnType.get_comp_param() == dictId &&
4949  !(val->tableId == td->tableId && val->columnId == cd->columnId)) {
4950  if (val->tableId < lowest_table) {
4951  lowest_table = val->tableId;
4952  lowest_column = val->columnId;
4953  lowest_column_name = val->columnName;
4954  }
4955  if (val->columnId < lowest_column) {
4956  lowest_column = val->columnId;
4957  lowest_column_name = val->columnName;
4958  }
4959  }
4960  }
4961  if (lowest_table != td->tableId || lowest_column != cd->columnId) {
4962  // we are referencing a different tables dictionary
4963  auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
4964  CHECK(lowest_td);
4965  std::string txt = "SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
4966  ") REFERENCES " + lowest_td->tableName + "(" +
4967  quoteIfRequired(lowest_column_name) + ")";
4968 
4969  additional_info.emplace_back(txt);
4970  shared_dict_column_names.insert(cd->columnName);
4971  }
4972  }
4973  }
4974  }
4975 }
4976 
4977 int32_t Catalog::createCustomExpression(
4978  std::unique_ptr<CustomExpression> custom_expression) {
4979  cat_write_lock write_lock(this);
4980  cat_sqlite_lock sqlite_lock(getObjForLock());
4981  sqliteConnector_.query("BEGIN TRANSACTION");
4982  int32_t custom_expression_id{-1};
4983  try {
4984  auto data_source_type_str =
4985  CustomExpression::dataSourceTypeToString(custom_expression->data_source_type);
4986  auto data_source_id_str = std::to_string(custom_expression->data_source_id);
4987  std::string custom_expr_select_query{
4988  "SELECT id FROM omnisci_custom_expressions WHERE name = ? and data_source_type = "
4989  "? and data_source_id = ? and is_deleted = ?"};
4990  std::vector<std::string> custom_expr_select_params{custom_expression->name,
4991  data_source_type_str,
4992  data_source_id_str,
4993  std::to_string(false)};
4994  sqliteConnector_.query_with_text_params(custom_expr_select_query,
4995  custom_expr_select_params);
4996  if (sqliteConnector_.getNumRows() > 0) {
4997  throw std::runtime_error{
4998  "A custom expression with the given "
4999  "name and data source already exists."};
5000  }
5001  sqliteConnector_.query_with_text_params(
5002  "INSERT INTO omnisci_custom_expressions(name, expression_json, "
5003  "data_source_type, data_source_id, is_deleted) VALUES (?,?,?,?,?)",
5004  std::vector<std::string>{custom_expression->name,
5005  custom_expression->expression_json,
5006  data_source_type_str,
5007  data_source_id_str,
5008  std::to_string(false)});
5009  sqliteConnector_.query_with_text_params(custom_expr_select_query,
5010  custom_expr_select_params);
5011  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
5012  custom_expression->id = sqliteConnector_.getData<int32_t>(0, 0);
5013  custom_expression_id = custom_expression->id;
5014  CHECK(custom_expr_map_by_id_.find(custom_expression->id) ==
5015  custom_expr_map_by_id_.end());
5016  custom_expr_map_by_id_[custom_expression->id] = std::move(custom_expression);
5017  } catch (std::exception& e) {
5018  sqliteConnector_.query("ROLLBACK TRANSACTION");
5019  throw;
5020  }
5021  sqliteConnector_.query("END TRANSACTION");
5022  CHECK_GT(custom_expression_id, 0);
5023  return custom_expression_id;
5024 }
5025 
5026 const CustomExpression* Catalog::getCustomExpression(int32_t custom_expression_id) const {
5027  cat_read_lock read_lock(this);
5028  auto it = custom_expr_map_by_id_.find(custom_expression_id);
5029  if (it != custom_expr_map_by_id_.end()) {
5030  return it->second.get();
5031  }
5032  return nullptr;
5033 }
5034 
5035 const std::unique_ptr<const CustomExpression> Catalog::getCustomExpressionFromStorage(
5036  int32_t custom_expression_id) {
5037  cat_sqlite_lock sqlite_lock(getObjForLock());
5038  sqliteConnector_.query_with_text_params(
5039  "SELECT id, name, expression_json, data_source_type, data_source_id, "
5040  "is_deleted FROM omnisci_custom_expressions WHERE id = ?",
5041  std::vector<std::string>{to_string(custom_expression_id)});
5042  if (sqliteConnector_.getNumRows() > 0) {
5043  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
5044  return getCustomExpressionFromConnector(0);
5045  }
5046  return nullptr;
5047 }
5048 
5049 std::vector<const CustomExpression*> Catalog::getCustomExpressionsForUser(
5050  const UserMetadata& user) const {
5051  std::vector<const CustomExpression*> all_custom_expressions;
5052  {
5053  // Get custom expression pointers separately in order to avoid holding the catalog
5054  // read lock while checking privileges (which may cause a deadlock).
5055  cat_read_lock read_lock(this);
5056  for (const auto& [id, custom_expression] : custom_expr_map_by_id_) {
5057  all_custom_expressions.emplace_back(custom_expression.get());
5058  }
5059  }
5060 
5061  std::vector<const CustomExpression*> filtered_custom_expressions;
5062  for (const auto custom_expression : all_custom_expressions) {
5063  CHECK(custom_expression->data_source_type == DataSourceType::TABLE);
5064  DBObject db_object{custom_expression->data_source_id, TableDBObjectType};
5065  db_object.loadKey(*this);
5066  db_object.setPrivileges(AccessPrivileges::SELECT_FROM_TABLE);
5067  if (SysCatalog::instance().checkPrivileges(user, {db_object})) {
5068  filtered_custom_expressions.emplace_back(custom_expression);
5069  }
5070  }
5071  return filtered_custom_expressions;
5072 }
5073 
5074 void Catalog::updateCustomExpression(int32_t custom_expression_id,
5075  const std::string& expression_json) {
5076  cat_write_lock write_lock(this);
5077  cat_sqlite_lock sqlite_lock(getObjForLock());
5078  auto it = custom_expr_map_by_id_.find(custom_expression_id);
5079  if (it == custom_expr_ma