OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Catalog/Catalog.h"
26 
27 #include <algorithm>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/range/adaptor/map.hpp>
31 #include <boost/version.hpp>
32 #include <cassert>
33 #include <cerrno>
34 #include <cstdio>
35 #include <cstring>
36 #include <exception>
37 #include <fstream>
38 #include <list>
39 #include <memory>
40 #include <random>
41 #include <regex>
42 #include <sstream>
43 
44 #if BOOST_VERSION >= 106600
45 #include <boost/uuid/detail/sha1.hpp>
46 #else
47 #include <boost/uuid/sha1.hpp>
48 #endif
49 #include <rapidjson/document.h>
50 #include <rapidjson/istreamwrapper.h>
51 #include <rapidjson/ostreamwrapper.h>
52 #include <rapidjson/writer.h>
53 
54 #include "Catalog/SysCatalog.h"
55 
56 #include "QueryEngine/Execute.h"
58 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
67 #include "Parser/ParserNode.h"
68 #include "QueryEngine/Execute.h"
70 #include "RefreshTimeCalculator.h"
71 #include "Shared/DateTimeParser.h"
72 #include "Shared/File.h"
73 #include "Shared/StringTransform.h"
74 #include "Shared/measure.h"
76 
77 #include "MapDRelease.h"
78 #include "RWLocks.h"
80 
81 using Chunk_NS::Chunk;
84 using std::list;
85 using std::map;
86 using std::pair;
87 using std::runtime_error;
88 using std::string;
89 using std::vector;
90 
92 bool g_enable_fsi{false};
93 bool g_enable_s3_fsi{false};
94 extern bool g_cache_string_hash;
95 
96 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
97 // under unit testing.
99 
100 namespace Catalog_Namespace {
101 
102 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
104  1073741824; // 2^30, give room for over a billion non-temp tables
106  1073741824; // 2^30, give room for over a billion non-temp dictionaries
107 
108 const std::string Catalog::physicalTableNameTag_("_shard_#");
109 
110 thread_local bool Catalog::thread_holds_read_lock = false;
111 
116 
117 // migration will be done as two step process this release
118 // will create and use new table
119 // next release will remove old table, doing this to have fall back path
120 // incase of migration failure
123  sqliteConnector_.query("BEGIN TRANSACTION");
124  try {
126  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
127  if (sqliteConnector_.getNumRows() != 0) {
128  // already done
129  sqliteConnector_.query("END TRANSACTION");
130  return;
131  }
133  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
134  "userid integer references mapd_users, state text, image_hash text, update_time "
135  "timestamp, "
136  "metadata text, UNIQUE(userid, name) )");
137  // now copy content from old table to new table
139  "insert into mapd_dashboards (id, name , "
140  "userid, state, image_hash, update_time , "
141  "metadata) "
142  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
143  "view_metadata "
144  "from mapd_frontend_views");
145  } catch (const std::exception& e) {
146  sqliteConnector_.query("ROLLBACK TRANSACTION");
147  throw;
148  }
149  sqliteConnector_.query("END TRANSACTION");
150 }
151 
152 namespace {
153 
154 inline auto table_json_filepath(const std::string& base_path,
155  const std::string& db_name) {
156  return boost::filesystem::path(base_path + "/mapd_catalogs/" + db_name +
157  "_temp_tables.json");
158 }
159 
160 } // namespace
161 
162 Catalog::Catalog(const string& basePath,
163  const DBMetadata& curDB,
164  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
165  const std::vector<LeafHostInfo>& string_dict_hosts,
166  std::shared_ptr<Calcite> calcite,
167  bool is_new_db)
168  : basePath_(basePath)
169  , sqliteConnector_(curDB.dbName, basePath + "/mapd_catalogs/")
170  , currentDB_(curDB)
171  , dataMgr_(dataMgr)
172  , string_dict_hosts_(string_dict_hosts)
173  , calciteMgr_(calcite)
174  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
175  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
176  , sqliteMutex_()
177  , sharedMutex_()
178  , thread_holding_sqlite_lock()
179  , thread_holding_write_lock() {
180  if (!is_new_db) {
181  CheckAndExecuteMigrations();
182  }
183  buildMaps();
184  if (!is_new_db) {
185  CheckAndExecuteMigrationsPostBuildMaps();
186  }
188  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
189  }
190  // once all initialized use real object
191  initialized_ = true;
192 }
193 
195 
198  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
199  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
200  tableDescIt != tableDescriptorMap_.end();
201  ++tableDescIt) {
202  tableDescIt->second->fragmenter = nullptr;
203  delete tableDescIt->second;
204  }
205 
206  // TableDescriptorMapById points to the same descriptors. No need to delete
207 
208  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
209  columnDescIt != columnDescriptorMap_.end();
210  ++columnDescIt) {
211  delete columnDescIt->second;
212  }
213 
214  // ColumnDescriptorMapById points to the same descriptors. No need to delete
215 
217  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
218  }
219 }
220 
222  if (initialized_) {
223  return this;
224  } else {
225  return SysCatalog::instance().getDummyCatalog().get();
226  }
227 }
228 
231  sqliteConnector_.query("BEGIN TRANSACTION");
232  try {
233  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
234  std::vector<std::string> cols;
235  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
236  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
237  }
238  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
239  cols.end()) {
240  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
242  sqliteConnector_.query(queryString);
243  }
244  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
245  cols.end()) {
246  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
247  std::to_string(0));
248  sqliteConnector_.query(queryString);
249  }
250  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
251  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
252  std::to_string(-1));
253  sqliteConnector_.query(queryString);
254  }
255  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
256  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
257  std::to_string(0));
258  sqliteConnector_.query(queryString);
259  }
260  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
261  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
262  sqliteConnector_.query(queryString);
263  }
264  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
265  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
267  sqliteConnector_.query(queryString);
268  }
269  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
270  cols.end()) {
272  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
273  }
274  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
275  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
276  sqliteConnector_.query(queryString);
277  }
278  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
279  cols.end()) {
280  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
281  std::to_string(-1));
282  sqliteConnector_.query(queryString);
283  }
284  } catch (std::exception& e) {
285  sqliteConnector_.query("ROLLBACK TRANSACTION");
286  throw;
287  }
288  sqliteConnector_.query("END TRANSACTION");
289 }
290 
293  sqliteConnector_.query("BEGIN TRANSACTION");
294  try {
296  "select name from sqlite_master WHERE type='table' AND "
297  "name='mapd_version_history'");
298  if (sqliteConnector_.getNumRows() == 0) {
300  "CREATE TABLE mapd_version_history(version integer, migration_history text "
301  "unique)");
302  } else {
304  "select * from mapd_version_history where migration_history = "
305  "'notnull_fixlen_arrays'");
306  if (sqliteConnector_.getNumRows() != 0) {
307  // legacy fixlen arrays had migrated
308  // no need for further execution
309  sqliteConnector_.query("END TRANSACTION");
310  return;
311  }
312  }
313  // Insert check for migration
315  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
316  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
317  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
318  // Upating all fixlen array columns
319  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
320  std::to_string(kARRAY) + " AND size>0;");
321  sqliteConnector_.query(queryString);
322  } catch (std::exception& e) {
323  sqliteConnector_.query("ROLLBACK TRANSACTION");
324  throw;
325  }
326  sqliteConnector_.query("END TRANSACTION");
327 }
328 
331  sqliteConnector_.query("BEGIN TRANSACTION");
332  try {
334  "select name from sqlite_master WHERE type='table' AND "
335  "name='mapd_version_history'");
336  if (sqliteConnector_.getNumRows() == 0) {
338  "CREATE TABLE mapd_version_history(version integer, migration_history text "
339  "unique)");
340  } else {
342  "select * from mapd_version_history where migration_history = "
343  "'notnull_geo_columns'");
344  if (sqliteConnector_.getNumRows() != 0) {
345  // legacy geo columns had migrated
346  // no need for further execution
347  sqliteConnector_.query("END TRANSACTION");
348  return;
349  }
350  }
351  // Insert check for migration
353  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
354  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
355  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
356  // Upating all geo columns
357  string queryString(
358  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
359  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
360  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
361  sqliteConnector_.query(queryString);
362  } catch (std::exception& e) {
363  sqliteConnector_.query("ROLLBACK TRANSACTION");
364  throw;
365  }
366  sqliteConnector_.query("END TRANSACTION");
367 }
368 
371  sqliteConnector_.query("BEGIN TRANSACTION");
372  try {
373  // check table still exists
375  "SELECT name FROM sqlite_master WHERE type='table' AND "
376  "name='mapd_frontend_views'");
377  if (sqliteConnector_.getNumRows() == 0) {
378  // table does not exists
379  // no need to migrate
380  sqliteConnector_.query("END TRANSACTION");
381  return;
382  }
383  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
384  std::vector<std::string> cols;
385  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
386  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
387  }
388  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
389  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
390  }
391  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
392  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
393  }
394  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
395  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
396  }
397  } catch (std::exception& e) {
398  sqliteConnector_.query("ROLLBACK TRANSACTION");
399  throw;
400  }
401  sqliteConnector_.query("END TRANSACTION");
402 }
403 
406  sqliteConnector_.query("BEGIN TRANSACTION");
407  try {
409  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
410  "integer references mapd_users, "
411  "link text unique, view_state text, update_time timestamp, view_metadata text)");
412  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
413  std::vector<std::string> cols;
414  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
415  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
416  }
417  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
418  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
419  }
420  } catch (const std::exception& e) {
421  sqliteConnector_.query("ROLLBACK TRANSACTION");
422  throw;
423  }
424  sqliteConnector_.query("END TRANSACTION");
425 }
426 
429  sqliteConnector_.query("BEGIN TRANSACTION");
430  try {
431  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
432  // check table still exists
434  "SELECT name FROM sqlite_master WHERE type='table' AND "
435  "name='mapd_frontend_views'");
436  if (sqliteConnector_.getNumRows() == 0) {
437  // table does not exists
438  // no need to migrate
439  sqliteConnector_.query("END TRANSACTION");
440  return;
441  }
443  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
444  } catch (const std::exception& e) {
445  sqliteConnector_.query("ROLLBACK TRANSACTION");
446  throw;
447  }
448  sqliteConnector_.query("END TRANSACTION");
449 }
450 
451 // introduce DB version into the tables table
452 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
453 // old value
454 
457  if (currentDB_.dbName.length() == 0) {
458  // updateDictionaryNames dbName length is zero nothing to do here
459  return;
460  }
461  sqliteConnector_.query("BEGIN TRANSACTION");
462  try {
463  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
464  std::vector<std::string> cols;
465  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
466  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
467  }
468  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
469  LOG(INFO) << "Updating mapd_tables updatePageSize";
470  // No version number
471  // need to update the defaul tpagesize to old correct value
472  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
473  // need to add new version info
474  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
476  sqliteConnector_.query(queryString);
477  }
478  } catch (std::exception& e) {
479  sqliteConnector_.query("ROLLBACK TRANSACTION");
480  throw;
481  }
482  sqliteConnector_.query("END TRANSACTION");
483 }
484 
487  sqliteConnector_.query("BEGIN TRANSACTION");
488  try {
489  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
490  std::vector<std::string> cols;
491  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
492  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
493  }
494  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
495  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
496  // need to add new version info
497  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
499  sqliteConnector_.query(queryString);
500  // need to add new column to table defintion to indicate deleted column, column used
501  // as bitmap for deleted rows.
503  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
504  }
505  } catch (std::exception& e) {
506  sqliteConnector_.query("ROLLBACK TRANSACTION");
507  throw;
508  }
509  sqliteConnector_.query("END TRANSACTION");
510 }
511 
514  sqliteConnector_.query("BEGIN TRANSACTION");
515  try {
516  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
517  std::vector<std::string> cols;
518  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
519  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
520  }
521  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
522  LOG(INFO) << "Adding support for default values to mapd_columns";
523  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
524  }
525  } catch (std::exception& e) {
526  LOG(ERROR) << "Failed to make metadata update for default values` support";
527  sqliteConnector_.query("ROLLBACK TRANSACTION");
528  throw;
529  }
530  sqliteConnector_.query("END TRANSACTION");
531 }
532 
533 // introduce DB version into the dictionary tables
534 // if the DB does not have a version rename all dictionary tables
535 
538  if (currentDB_.dbName.length() == 0) {
539  // updateDictionaryNames dbName length is zero nothing to do here
540  return;
541  }
542  sqliteConnector_.query("BEGIN TRANSACTION");
543  try {
544  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
545  std::vector<std::string> cols;
546  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
547  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
548  }
549  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
550  // No version number
551  // need to rename dictionaries
552  string dictQuery("SELECT dictid, name from mapd_dictionaries");
553  sqliteConnector_.query(dictQuery);
554  size_t numRows = sqliteConnector_.getNumRows();
555  for (size_t r = 0; r < numRows; ++r) {
556  int dictId = sqliteConnector_.getData<int>(r, 0);
557  std::string dictName = sqliteConnector_.getData<string>(r, 1);
558 
559  std::string oldName =
560  g_base_path + "/mapd_data/" + currentDB_.dbName + "_" + dictName;
561  std::string newName = g_base_path + "/mapd_data/DB_" +
562  std::to_string(currentDB_.dbId) + "_DICT_" +
563  std::to_string(dictId);
564 
565  int result = rename(oldName.c_str(), newName.c_str());
566 
567  if (result == 0) {
568  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
569  << newName;
570  } else {
571  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
572  << newName + " dbname '" << currentDB_.dbName << "' error code "
573  << std::to_string(result);
574  }
575  }
576  // need to add new version info
577  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
579  sqliteConnector_.query(queryString);
580  }
581  } catch (std::exception& e) {
582  sqliteConnector_.query("ROLLBACK TRANSACTION");
583  throw;
584  }
585  sqliteConnector_.query("END TRANSACTION");
586 }
587 
590  sqliteConnector_.query("BEGIN TRANSACTION");
591  try {
593  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
594  "logical_table_id integer, physical_table_id integer)");
595  } catch (const std::exception& e) {
596  sqliteConnector_.query("ROLLBACK TRANSACTION");
597  throw;
598  }
599  sqliteConnector_.query("END TRANSACTION");
600 }
601 
602 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
603  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
604  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
605  */
606 
608  sqliteConnector_.query("BEGIN TRANSACTION");
609  try {
610  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
611  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
612  const auto physicalTables = physicalTableIt->second;
613  CHECK(!physicalTables.empty());
614  for (size_t i = 0; i < physicalTables.size(); i++) {
615  int32_t physical_tb_id = physicalTables[i];
617  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
618  "physical_table_id) VALUES (?1, ?2)",
619  std::vector<std::string>{std::to_string(logical_tb_id),
620  std::to_string(physical_tb_id)});
621  }
622  }
623  } catch (std::exception& e) {
624  sqliteConnector_.query("ROLLBACK TRANSACTION");
625  throw;
626  }
627  sqliteConnector_.query("END TRANSACTION");
628 }
629 
632  sqliteConnector_.query("BEGIN TRANSACTION");
633  try {
634  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
635  std::vector<std::string> cols;
636  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
637  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
638  }
639  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
640  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
641  }
642  } catch (std::exception& e) {
643  sqliteConnector_.query("ROLLBACK TRANSACTION");
644  throw;
645  }
646  sqliteConnector_.query("END TRANSACTION");
647 }
648 
651  sqliteConnector_.query("BEGIN TRANSACTION");
652  try {
655  } catch (std::exception& e) {
656  sqliteConnector_.query("ROLLBACK TRANSACTION");
657  throw;
658  }
659  sqliteConnector_.query("END TRANSACTION");
660 }
661 
664  sqliteConnector_.query("BEGIN TRANSACTION");
665  try {
667  } catch (const std::exception& e) {
668  sqliteConnector_.query("ROLLBACK TRANSACTION");
669  throw;
670  }
671  sqliteConnector_.query("END TRANSACTION");
672 }
673 
674 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
675  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
676  "omnisci_foreign_servers(id integer primary key, name text unique, " +
677  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
678  "options text)";
679 }
680 
681 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
682  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
683  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
684  "options text, last_refresh_time integer, next_refresh_time integer, " +
685  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
686  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
687 }
688 
689 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
690  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
691  "omnisci_custom_expressions(id integer primary key, name text, " +
692  "expression_json text, data_source_type text, " +
693  "data_source_id integer, is_deleted boolean)";
694 }
695 
698  sqliteConnector_.query("BEGIN TRANSACTION");
699  std::vector<DBObject> objects;
700  try {
702  "SELECT name FROM sqlite_master WHERE type='table' AND "
703  "name='mapd_record_ownership_marker'");
704  // check if mapd catalog - marker exists
705  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
706  // already done
707  sqliteConnector_.query("END TRANSACTION");
708  return;
709  }
710  // check if different catalog - marker exists
711  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
712  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
713  // Check if migration is being performed on existing non mapd catalogs
714  // Older non mapd dbs will have table but no record in them
715  if (sqliteConnector_.getNumRows() != 0) {
716  // already done
717  sqliteConnector_.query("END TRANSACTION");
718  return;
719  }
720  }
721  // marker not exists - create one
722  else {
723  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
724  }
725 
726  DBMetadata db;
727  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
728  // place dbId as a refernce for migration being performed
730  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
731  std::vector<std::string>{std::to_string(db.dbOwner)});
732 
733  static const std::map<const DBObjectType, const AccessPrivileges>
734  object_level_all_privs_lookup{
740 
741  // grant owner all permissions on DB
742  DBObjectKey key;
743  key.dbId = currentDB_.dbId;
744  auto _key_place = [&key](auto type) {
745  key.permissionType = type;
746  return key;
747  };
748  for (auto& it : object_level_all_privs_lookup) {
749  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
750  objects.back().setName(currentDB_.dbName);
751  }
752 
753  {
754  // other users tables and views
755  string tableQuery(
756  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
757  sqliteConnector_.query(tableQuery);
758  size_t numRows = sqliteConnector_.getNumRows();
759  for (size_t r = 0; r < numRows; ++r) {
760  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
761  std::string tableName = sqliteConnector_.getData<string>(r, 1);
762  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
763  bool isview = sqliteConnector_.getData<bool>(r, 3);
764 
767  DBObjectKey key;
768  key.dbId = currentDB_.dbId;
769  key.objectId = tableid;
770  key.permissionType = type;
771 
772  DBObject obj(tableName, type);
773  obj.setObjectKey(key);
774  obj.setOwner(ownerid);
777 
778  objects.push_back(obj);
779  }
780  }
781 
782  {
783  // other users dashboards
784  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
785  sqliteConnector_.query(tableQuery);
786  size_t numRows = sqliteConnector_.getNumRows();
787  for (size_t r = 0; r < numRows; ++r) {
788  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
789  std::string dashName = sqliteConnector_.getData<string>(r, 1);
790  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
791 
793  DBObjectKey key;
794  key.dbId = currentDB_.dbId;
795  key.objectId = dashId;
796  key.permissionType = type;
797 
798  DBObject obj(dashName, type);
799  obj.setObjectKey(key);
800  obj.setOwner(ownerid);
802 
803  objects.push_back(obj);
804  }
805  }
806  } catch (const std::exception& e) {
807  sqliteConnector_.query("ROLLBACK TRANSACTION");
808  throw;
809  }
810  sqliteConnector_.query("END TRANSACTION");
811 
812  // now apply the objects to the syscat to track the permisisons
813  // moved outside transaction to avoid lock in sqlite
814  try {
816  } catch (const std::exception& e) {
817  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
818  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
819  e.what());
820  // will need to remove the mapd_record_ownership_marker table and retry
821  }
822 }
823 
828 }
829 
831  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
832  std::vector<std::string> dashboard_ids;
833  static const std::string migration_name{"dashboard_roles_migration"};
834  {
836  sqliteConnector_.query("BEGIN TRANSACTION");
837  try {
838  // migration_history should be present in all catalogs by now
839  // if not then would be created before this migration
841  "select * from mapd_version_history where migration_history = '" +
842  migration_name + "'");
843  if (sqliteConnector_.getNumRows() != 0) {
844  // no need for further execution
845  sqliteConnector_.query("END TRANSACTION");
846  return;
847  }
848  LOG(INFO) << "Performing dashboard internal roles Migration.";
849  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
850  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
853  sqliteConnector_.getData<string>(i, 0)))) {
854  // Successfully created roles during previous migration/crash
855  // No need to include them
856  continue;
857  }
858  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
859  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
860  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
861  }
862  } catch (const std::exception& e) {
863  sqliteConnector_.query("ROLLBACK TRANSACTION");
864  throw;
865  }
866  sqliteConnector_.query("END TRANSACTION");
867  }
868  // All current grantees with shared dashboards.
869  const auto active_grantees =
871 
872  try {
873  // NOTE(wamsi): Transactionally unsafe
874  for (auto dash : dashboards) {
875  createOrUpdateDashboardSystemRole(dash.second.second,
876  dash.second.first,
878  std::to_string(currentDB_.dbId), dash.first));
879  auto result = active_grantees.find(dash.first);
880  if (result != active_grantees.end()) {
883  dash.first)},
884  result->second);
885  }
886  }
888  // check if this has already been completed
890  "select * from mapd_version_history where migration_history = '" +
891  migration_name + "'");
892  if (sqliteConnector_.getNumRows() != 0) {
893  return;
894  }
896  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
897  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
898  } catch (const std::exception& e) {
899  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
900  << e.what();
901  throw;
902  }
903  LOG(INFO) << "Successfully created dashboard system roles during migration.";
904 }
905 
916  updatePageSize();
920  if (g_enable_fsi) {
922  }
925 }
926 
930 }
931 
932 namespace {
933 std::string getUserFromId(const int32_t id) {
934  UserMetadata user;
935  if (SysCatalog::instance().getMetadataForUserById(id, user)) {
936  return user.userName;
937  }
938  // a user could be deleted and a dashboard still exist?
939  return "Unknown";
940 }
941 } // namespace
942 
946 
947  string dictQuery(
948  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
949  sqliteConnector_.query(dictQuery);
950  size_t numRows = sqliteConnector_.getNumRows();
951  for (size_t r = 0; r < numRows; ++r) {
952  int dictId = sqliteConnector_.getData<int>(r, 0);
953  std::string dictName = sqliteConnector_.getData<string>(r, 1);
954  int dictNBits = sqliteConnector_.getData<int>(r, 2);
955  bool is_shared = sqliteConnector_.getData<bool>(r, 3);
956  int refcount = sqliteConnector_.getData<int>(r, 4);
957  std::string fname = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
958  "_DICT_" + std::to_string(dictId);
959  DictRef dict_ref(currentDB_.dbId, dictId);
960  DictDescriptor* dd = new DictDescriptor(
961  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
962  dictDescriptorMapByRef_[dict_ref].reset(dd);
963  }
964 
965  string tableQuery(
966  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
967  "max_chunk_size, frag_page_size, "
968  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
969  "sort_column_id, storage_type, max_rollback_epochs "
970  "from mapd_tables");
971  sqliteConnector_.query(tableQuery);
972  numRows = sqliteConnector_.getNumRows();
973  for (size_t r = 0; r < numRows; ++r) {
974  TableDescriptor* td;
975  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
976  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
977  const auto table_id = sqliteConnector_.getData<int>(r, 0);
978  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
979  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
980  "supported table option (table "
981  << table_name << " [" << table_id << "] in database "
982  << currentDB_.dbName << ").";
983  }
984 
985  if (storage_type == StorageType::FOREIGN_TABLE) {
987  } else {
988  td = new TableDescriptor();
989  }
990 
991  td->storageType = storage_type;
992  td->tableId = sqliteConnector_.getData<int>(r, 0);
993  td->tableName = sqliteConnector_.getData<string>(r, 1);
994  td->nColumns = sqliteConnector_.getData<int>(r, 2);
995  td->isView = sqliteConnector_.getData<bool>(r, 3);
996  td->fragments = sqliteConnector_.getData<string>(r, 4);
997  td->fragType =
999  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1000  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1001  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1002  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1003  td->partitions = sqliteConnector_.getData<string>(r, 10);
1004  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1005  td->shard = sqliteConnector_.getData<int>(r, 12);
1006  td->nShards = sqliteConnector_.getData<int>(r, 13);
1007  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1008  td->userId = sqliteConnector_.getData<int>(r, 15);
1009  td->sortedColumnId =
1010  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1011  if (!td->isView) {
1012  td->fragmenter = nullptr;
1013  }
1014  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1015  td->hasDeletedCol = false;
1016 
1018  tableDescriptorMapById_[td->tableId] = td;
1019  }
1020 
1021  if (g_enable_fsi) {
1025  }
1026 
1027  string columnQuery(
1028  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1029  "is_notnull, compression, comp_param, "
1030  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1031  "default_value from "
1032  "mapd_columns ORDER BY tableid, "
1033  "columnid");
1034  sqliteConnector_.query(columnQuery);
1035  numRows = sqliteConnector_.getNumRows();
1036  int32_t skip_physical_cols = 0;
1037  for (size_t r = 0; r < numRows; ++r) {
1038  ColumnDescriptor* cd = new ColumnDescriptor();
1039  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1040  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1041  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1045  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1049  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1050  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1051  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1052  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1053  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1054  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1055  if (sqliteConnector_.isNull(r, 16)) {
1056  cd->default_value = std::nullopt;
1057  } else {
1058  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1059  }
1060  cd->isGeoPhyCol = skip_physical_cols > 0;
1061  ColumnKey columnKey(cd->tableId, to_upper(cd->columnName));
1062  columnDescriptorMap_[columnKey] = cd;
1063  ColumnIdKey columnIdKey(cd->tableId, cd->columnId);
1064  columnDescriptorMapById_[columnIdKey] = cd;
1065 
1066  if (skip_physical_cols <= 0) {
1067  skip_physical_cols = cd->columnType.get_physical_cols();
1068  }
1069 
1070  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1071  CHECK(td_itr != tableDescriptorMapById_.end());
1072 
1073  if (cd->isDeletedCol) {
1074  td_itr->second->hasDeletedCol = true;
1075  setDeletedColumnUnlocked(td_itr->second, cd);
1076  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1077  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1078  }
1079  }
1080  // sort columnIdBySpi_ based on columnId
1081  for (auto& tit : tableDescriptorMapById_) {
1082  std::sort(tit.second->columnIdBySpi_.begin(),
1083  tit.second->columnIdBySpi_.end(),
1084  [](const size_t a, const size_t b) -> bool { return a < b; });
1085  }
1086 
1087  string viewQuery("SELECT tableid, sql FROM mapd_views");
1088  sqliteConnector_.query(viewQuery);
1089  numRows = sqliteConnector_.getNumRows();
1090  for (size_t r = 0; r < numRows; ++r) {
1091  int32_t tableId = sqliteConnector_.getData<int>(r, 0);
1092  TableDescriptor* td = tableDescriptorMapById_[tableId];
1093  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1094  td->fragmenter = nullptr;
1095  }
1096 
1097  string frontendViewQuery(
1098  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1099  "userid, "
1100  "metadata "
1101  "FROM mapd_dashboards");
1102  sqliteConnector_.query(frontendViewQuery);
1103  numRows = sqliteConnector_.getNumRows();
1104  for (size_t r = 0; r < numRows; ++r) {
1105  std::shared_ptr<DashboardDescriptor> vd = std::make_shared<DashboardDescriptor>();
1106  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1107  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1108  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1109  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1110  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1111  vd->userId = sqliteConnector_.getData<int>(r, 5);
1112  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1113  vd->user = getUserFromId(vd->userId);
1114  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1116  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1117  }
1118 
1119  string linkQuery(
1120  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1121  "update_time), view_metadata "
1122  "FROM mapd_links");
1123  sqliteConnector_.query(linkQuery);
1124  numRows = sqliteConnector_.getNumRows();
1125  for (size_t r = 0; r < numRows; ++r) {
1126  LinkDescriptor* ld = new LinkDescriptor();
1127  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1128  ld->userId = sqliteConnector_.getData<int>(r, 1);
1129  ld->link = sqliteConnector_.getData<string>(r, 2);
1130  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1131  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1132  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1134  linkDescriptorMapById_[ld->linkId] = ld;
1135  }
1136 
1137  /* rebuild map linking logical tables to corresponding physical ones */
1138  string logicalToPhysicalTableMapQuery(
1139  "SELECT logical_table_id, physical_table_id "
1140  "FROM mapd_logical_to_physical");
1141  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1142  numRows = sqliteConnector_.getNumRows();
1143  for (size_t r = 0; r < numRows; ++r) {
1144  int32_t logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1145  int32_t physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1146  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1147  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1148  /* add new entity to the map logicalToPhysicalTableMapById_ */
1149  std::vector<int32_t> physicalTables;
1150  physicalTables.push_back(physical_tb_id);
1151  const auto it_ok =
1152  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1153  CHECK(it_ok.second);
1154  } else {
1155  /* update map logicalToPhysicalTableMapById_ */
1156  physicalTableIt->second.push_back(physical_tb_id);
1157  }
1158  }
1159 
1161 }
1162 
1165  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1166  "is_deleted "
1167  "FROM omnisci_custom_expressions");
1168  auto num_rows = sqliteConnector_.getNumRows();
1169  for (size_t row = 0; row < num_rows; row++) {
1170  auto custom_expr = getCustomExpressionFromConnector(row);
1171  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1172  }
1173 }
1174 
1175 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1176  auto id = sqliteConnector_.getData<int>(row, 0);
1177  auto name = sqliteConnector_.getData<string>(row, 1);
1178  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1179  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1180  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1181  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1182  return std::make_unique<CustomExpression>(
1183  id,
1184  name,
1185  expression_json,
1186  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1187  data_source_id,
1188  is_deleted);
1189 }
1190 
1192  const list<ColumnDescriptor>& columns,
1193  const list<DictDescriptor>& dicts) {
1194  cat_write_lock write_lock(this);
1195  TableDescriptor* new_td;
1196 
1197  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1198  if (foreign_table) {
1199  auto new_foreign_table = new foreign_storage::ForeignTable();
1200  *new_foreign_table = *foreign_table;
1201  new_td = new_foreign_table;
1202  } else {
1203  new_td = new TableDescriptor();
1204  *new_td = *td;
1205  }
1206 
1207  new_td->mutex_ = std::make_shared<std::mutex>();
1208  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1209  tableDescriptorMapById_[td->tableId] = new_td;
1210  for (auto cd : columns) {
1211  ColumnDescriptor* new_cd = new ColumnDescriptor();
1212  *new_cd = cd;
1213  ColumnKey columnKey(new_cd->tableId, to_upper(new_cd->columnName));
1214  columnDescriptorMap_[columnKey] = new_cd;
1215  ColumnIdKey columnIdKey(new_cd->tableId, new_cd->columnId);
1216  columnDescriptorMapById_[columnIdKey] = new_cd;
1217 
1218  // Add deleted column to the map
1219  if (cd.isDeletedCol) {
1220  CHECK(new_td->hasDeletedCol);
1221  setDeletedColumnUnlocked(new_td, new_cd);
1222  }
1223  }
1224 
1225  std::sort(new_td->columnIdBySpi_.begin(),
1226  new_td->columnIdBySpi_.end(),
1227  [](const size_t a, const size_t b) -> bool { return a < b; });
1228 
1229  std::unique_ptr<StringDictionaryClient> client;
1230  DictRef dict_ref(currentDB_.dbId, -1);
1231  if (!string_dict_hosts_.empty()) {
1232  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1233  }
1234  for (auto dd : dicts) {
1235  if (!dd.dictRef.dictId) {
1236  // Dummy entry created for a shard of a logical table, nothing to do.
1237  continue;
1238  }
1239  dict_ref.dictId = dd.dictRef.dictId;
1240  if (client) {
1241  client->create(dict_ref, dd.dictIsTemp);
1242  }
1243  DictDescriptor* new_dd = new DictDescriptor(dd);
1244  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1245  if (!dd.dictIsTemp) {
1246  boost::filesystem::create_directory(new_dd->dictFolderPath);
1247  }
1248  }
1249 }
1250 
1251 void Catalog::removeTableFromMap(const string& tableName,
1252  const int tableId,
1253  const bool is_on_error) {
1254  cat_write_lock write_lock(this);
1255  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1256  if (tableDescIt == tableDescriptorMapById_.end()) {
1257  throw runtime_error("Table " + tableName + " does not exist.");
1258  }
1259 
1260  TableDescriptor* td = tableDescIt->second;
1261 
1262  if (td->hasDeletedCol) {
1263  const auto ret = deletedColumnPerTable_.erase(td);
1264  CHECK_EQ(ret, size_t(1));
1265  }
1266 
1267  tableDescriptorMapById_.erase(tableDescIt);
1268  tableDescriptorMap_.erase(to_upper(tableName));
1269  td->fragmenter = nullptr;
1270 
1272  delete td;
1273 
1274  std::unique_ptr<StringDictionaryClient> client;
1275  if (SysCatalog::instance().isAggregator()) {
1276  CHECK(!string_dict_hosts_.empty());
1277  DictRef dict_ref(currentDB_.dbId, -1);
1278  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1279  }
1280 
1281  // delete all column descriptors for the table
1282  // no more link columnIds to sequential indexes!
1283  for (auto cit = columnDescriptorMapById_.begin();
1284  cit != columnDescriptorMapById_.end();) {
1285  if (tableId != std::get<0>(cit->first)) {
1286  ++cit;
1287  } else {
1288  int i = std::get<1>(cit++->first);
1289  ColumnIdKey cidKey(tableId, i);
1290  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1291  ColumnDescriptor* cd = colDescIt->second;
1292  columnDescriptorMapById_.erase(colDescIt);
1293  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1294  columnDescriptorMap_.erase(cnameKey);
1295  const int dictId = cd->columnType.get_comp_param();
1296  // Dummy dictionaries created for a shard of a logical table have the id set to
1297  // zero.
1298  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1299  INJECT_TIMER(removingDicts);
1300  DictRef dict_ref(currentDB_.dbId, dictId);
1301  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1302  // If we're removing this table due to an error, it is possible that the string
1303  // dictionary reference was never populated. Don't crash, just continue cleaning
1304  // up the TableDescriptor and ColumnDescriptors
1305  if (!is_on_error) {
1306  CHECK(dictIt != dictDescriptorMapByRef_.end());
1307  } else {
1308  if (dictIt == dictDescriptorMapByRef_.end()) {
1309  continue;
1310  }
1311  }
1312  const auto& dd = dictIt->second;
1313  CHECK_GE(dd->refcount, 1);
1314  --dd->refcount;
1315  if (!dd->refcount) {
1316  dd->stringDict.reset();
1317  if (!isTemp) {
1318  File_Namespace::renameForDelete(dd->dictFolderPath);
1319  }
1320  if (client) {
1321  client->drop(dict_ref);
1322  }
1323  dictDescriptorMapByRef_.erase(dictIt);
1324  }
1325  }
1326 
1327  delete cd;
1328  }
1329  }
1330 }
1331 
1333  cat_write_lock write_lock(this);
1335 }
1336 
1338  cat_write_lock write_lock(this);
1340  std::make_shared<DashboardDescriptor>(vd);
1341 }
1342 
1343 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1344  const int& user_id) {
1345  std::vector<DBObject> objects;
1346  DBObjectKey key;
1347  key.dbId = currentDB_.dbId;
1348  auto _key_place = [&key](auto type, auto id) {
1349  key.permissionType = type;
1350  key.objectId = id;
1351  return key;
1352  };
1353  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1354  auto td = getMetadataForTable(object_name);
1355  if (!td) {
1356  // Parsed object source is not present in current database
1357  // LOG the info and ignore
1358  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1359  << " no longer exists in current DB.";
1360  continue;
1361  }
1362  // Dashboard source can be Table or View
1363  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1364  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1366  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1367  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1368  objects.back().setName(td->tableName);
1369  }
1370  return objects;
1371 }
1372 
1373 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1374  const int32_t& user_id,
1375  const std::string& dash_role_name) {
1376  auto objects = parseDashboardObjects(view_meta, user_id);
1377  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1378  if (!rl) {
1379  // Dashboard role does not exist
1380  // create role and grant privileges
1381  // NOTE(wamsi): Transactionally unsafe
1382  SysCatalog::instance().createRole(dash_role_name, false);
1383  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1384  } else {
1385  // Dashboard system role already exists
1386  // Add/remove privileges on objects
1387  std::set<DBObjectKey> revoke_keys;
1388  auto ex_objects = rl->getDbObjects(true);
1389  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1390  if (key.permissionType != TableDBObjectType &&
1391  key.permissionType != ViewDBObjectType) {
1392  continue;
1393  }
1394  bool found = false;
1395  for (auto obj : objects) {
1396  found = key == obj.getObjectKey() ? true : false;
1397  if (found) {
1398  break;
1399  }
1400  }
1401  if (!found) {
1402  revoke_keys.insert(key);
1403  }
1404  }
1405  for (auto& key : revoke_keys) {
1406  // revoke privs on object since the object is no
1407  // longer used by the dashboard as source
1408  // NOTE(wamsi): Transactionally unsafe
1410  dash_role_name, *rl->findDbObject(key, true), *this);
1411  }
1412  // Update privileges on remaining objects
1413  // NOTE(wamsi): Transactionally unsafe
1414  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1415  }
1416 }
1417 
1419  cat_write_lock write_lock(this);
1420  LinkDescriptor* new_ld = new LinkDescriptor();
1421  *new_ld = ld;
1423  linkDescriptorMapById_[ld.linkId] = new_ld;
1424 }
1425 
1427  auto time_ms = measure<>::execution([&]() {
1428  // instanciate table fragmenter upon first use
1429  // assume only insert order fragmenter is supported
1431  vector<Chunk> chunkVec;
1432  list<const ColumnDescriptor*> columnDescs;
1433  getAllColumnMetadataForTableImpl(td, columnDescs, true, false, true);
1434  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1435  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1436  if (td->sortedColumnId > 0) {
1437  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1438  chunkVec,
1439  dataMgr_.get(),
1440  const_cast<Catalog*>(this),
1441  td->tableId,
1442  td->shard,
1443  td->maxFragRows,
1444  td->maxChunkSize,
1445  td->fragPageSize,
1446  td->maxRows,
1447  td->persistenceLevel);
1448  } else {
1449  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1450  chunkVec,
1451  dataMgr_.get(),
1452  const_cast<Catalog*>(this),
1453  td->tableId,
1454  td->shard,
1455  td->maxFragRows,
1456  td->maxChunkSize,
1457  td->fragPageSize,
1458  td->maxRows,
1459  td->persistenceLevel,
1460  !td->storageType.empty());
1461  }
1462  });
1463  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1464  << time_ms << "ms";
1465 }
1466 
1468  const std::string& tableName) const {
1469  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1470  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1471  return nullptr;
1472  }
1473  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1474 };
1475 
1477  const std::string& tableName) const {
1478  cat_read_lock read_lock(this);
1479  return getForeignTableUnlocked(tableName);
1480 };
1481 
1482 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1483  const bool populateFragmenter) const {
1484  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1485  // pure metadata calls
1486  cat_read_lock read_lock(this);
1487  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1488  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1489  return nullptr;
1490  }
1491  TableDescriptor* td = tableDescIt->second;
1492  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1493  if (populateFragmenter && td->fragmenter == nullptr && !td->isView) {
1495  }
1496  return td; // returns pointer to table descriptor
1497 }
1498 
1500  int tableId,
1501  const bool populateFragmenter) const {
1502  auto tableDescIt = tableDescriptorMapById_.find(tableId);
1503  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1504  return nullptr;
1505  }
1506  TableDescriptor* td = tableDescIt->second;
1507  if (populateFragmenter) {
1508  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1509  if (td->fragmenter == nullptr && !td->isView) {
1511  }
1512  }
1513  return td; // returns pointer to table descriptor
1514 }
1515 
1517  bool populateFragmenter) const {
1518  cat_read_lock read_lock(this);
1519  return getMetadataForTableImpl(tableId, populateFragmenter);
1520 }
1521 
1523  const bool load_dict) const {
1524  cat_read_lock read_lock(this);
1525  return getMetadataForDictUnlocked(dict_id, load_dict);
1526 }
1527 
1529  auto tableDescIt = tableDescriptorMapById_.find(tableId);
1530  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1531  return nullptr;
1532  }
1533  return tableDescIt->second;
1534 }
1535 
1537  const bool loadDict) const {
1538  const DictRef dictRef(currentDB_.dbId, dictId);
1539  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1540  if (dictDescIt ==
1541  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1542  return nullptr;
1543  }
1544  auto& dd = dictDescIt->second;
1545 
1546  if (loadDict) {
1547  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1548  if (!dd->stringDict) {
1549  auto time_ms = measure<>::execution([&]() {
1550  if (string_dict_hosts_.empty()) {
1551  if (dd->dictIsTemp) {
1552  dd->stringDict = std::make_shared<StringDictionary>(
1553  dd->dictFolderPath, true, true, g_cache_string_hash);
1554  } else {
1555  dd->stringDict = std::make_shared<StringDictionary>(
1556  dd->dictFolderPath, false, true, g_cache_string_hash);
1557  }
1558  } else {
1559  dd->stringDict =
1560  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1561  }
1562  });
1563  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1564  << dd->dictRef.dictId << " was " << time_ms << "ms";
1565  }
1566  }
1567 
1568  return dd.get();
1569 }
1570 
1571 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1572  return string_dict_hosts_;
1573 }
1574 
1576  const string& columnName) const {
1577  cat_read_lock read_lock(this);
1578 
1579  ColumnKey columnKey(tableId, to_upper(columnName));
1580  auto colDescIt = columnDescriptorMap_.find(columnKey);
1581  if (colDescIt ==
1582  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1583  return nullptr;
1584  }
1585  return colDescIt->second;
1586 }
1587 
1588 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1589  cat_read_lock read_lock(this);
1590  return getMetadataForColumnUnlocked(table_id, column_id);
1591 }
1592 
1594  int columnId) const {
1595  ColumnIdKey columnIdKey(tableId, columnId);
1596  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1597  if (colDescIt == columnDescriptorMapById_
1598  .end()) { // need to check to make sure column exists for table
1599  return nullptr;
1600  }
1601  return colDescIt->second;
1602 }
1603 
1604 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1605  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1606  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1607  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1608 
1609  auto spx = spi;
1610  int phi = 0;
1611  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1612  {
1613  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1614  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1615  }
1616 
1617  CHECK(0 < spx && spx <= columnIdBySpi.size());
1618  return columnIdBySpi[spx - 1] + phi;
1619 }
1620 
1621 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
1622  cat_read_lock read_lock(this);
1623  return getColumnIdBySpiUnlocked(table_id, spi);
1624 }
1625 
1627  const size_t spi) const {
1628  cat_read_lock read_lock(this);
1629 
1630  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
1631  ColumnIdKey columnIdKey(tableId, columnId);
1632  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1633  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
1634 }
1635 
1636 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
1637  const UserMetadata& user) {
1638  std::stringstream invalid_ids, restricted_ids;
1639 
1640  for (int32_t dashboard_id : dashboard_ids) {
1641  if (!getMetadataForDashboard(dashboard_id)) {
1642  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
1643  continue;
1644  }
1645  DBObject object(dashboard_id, DashboardDBObjectType);
1646  object.loadKey(*this);
1647  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
1648  std::vector<DBObject> privs = {object};
1649  if (!SysCatalog::instance().checkPrivileges(user, privs))
1650  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
1651  }
1652 
1653  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
1654  std::stringstream error_message;
1655  error_message << "Delete dashboard(s) failed with error(s):";
1656  if (invalid_ids.str().size() > 0)
1657  error_message << "\nDashboard id: " << invalid_ids.str()
1658  << " - Dashboard id does not exist";
1659  if (restricted_ids.str().size() > 0)
1660  error_message
1661  << "\nDashboard id: " << restricted_ids.str()
1662  << " - User should be either owner of dashboard or super user to delete it";
1663  throw std::runtime_error(error_message.str());
1664  }
1665  std::vector<DBObject> dash_objs;
1666 
1667  for (int32_t dashboard_id : dashboard_ids) {
1668  dash_objs.push_back(DBObject(dashboard_id, DashboardDBObjectType));
1669  }
1670  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
1672  {
1673  cat_write_lock write_lock(this);
1675 
1676  sqliteConnector_.query("BEGIN TRANSACTION");
1677  try {
1678  for (int32_t dashboard_id : dashboard_ids) {
1679  auto dash = getMetadataForDashboard(dashboard_id);
1680  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
1681  // rollback if already deleted
1682  if (!dash) {
1683  throw std::runtime_error(
1684  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
1685  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
1686  }
1687  std::string user_id = std::to_string(dash->userId);
1688  std::string dash_name = dash->dashboardName;
1689  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
1690  dashboardDescriptorMap_.erase(viewDescIt);
1692  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
1693  std::vector<std::string>{dash_name, user_id});
1694  }
1695  } catch (std::exception& e) {
1696  sqliteConnector_.query("ROLLBACK TRANSACTION");
1697  throw;
1698  }
1699  sqliteConnector_.query("END TRANSACTION");
1700  }
1701 }
1702 
1704  const string& userId,
1705  const string& dashName) const {
1706  cat_read_lock read_lock(this);
1707 
1708  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
1709  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
1710  return nullptr;
1711  }
1712  return viewDescIt->second.get(); // returns pointer to view descriptor
1713 }
1714 
1716  cat_read_lock read_lock(this);
1717  std::string userId;
1718  std::string name;
1719  bool found{false};
1720  {
1721  for (auto descp : dashboardDescriptorMap_) {
1722  auto dash = descp.second.get();
1723  if (dash->dashboardId == id) {
1724  userId = std::to_string(dash->userId);
1725  name = dash->dashboardName;
1726  found = true;
1727  break;
1728  }
1729  }
1730  }
1731  if (found) {
1732  return getMetadataForDashboard(userId, name);
1733  }
1734  return nullptr;
1735 }
1736 
1737 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
1738  cat_read_lock read_lock(this);
1739  auto linkDescIt = linkDescriptorMap_.find(link);
1740  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
1741  return nullptr;
1742  }
1743  return linkDescIt->second; // returns pointer to view descriptor
1744 }
1745 
1747  cat_read_lock read_lock(this);
1748  auto linkDescIt = linkDescriptorMapById_.find(linkId);
1749  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
1750  return nullptr;
1751  }
1752  return linkDescIt->second;
1753 }
1754 
1756  int table_id) const {
1757  auto table = getMetadataForTableImpl(table_id, false);
1758  CHECK(table);
1759  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
1760  CHECK(foreign_table);
1761  return foreign_table;
1762 }
1763 
1765  cat_read_lock read_lock(this);
1766  return getForeignTableUnlocked(table_id);
1767 }
1768 
1770  const TableDescriptor* td,
1771  list<const ColumnDescriptor*>& columnDescriptors,
1772  const bool fetchSystemColumns,
1773  const bool fetchVirtualColumns,
1774  const bool fetchPhysicalColumns) const {
1775  int32_t skip_physical_cols = 0;
1776  for (const auto& columnDescriptor : columnDescriptorMapById_) {
1777  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
1778  --skip_physical_cols;
1779  continue;
1780  }
1781  auto cd = columnDescriptor.second;
1782  if (cd->tableId != td->tableId) {
1783  continue;
1784  }
1785  if (!fetchSystemColumns && cd->isSystemCol) {
1786  continue;
1787  }
1788  if (!fetchVirtualColumns && cd->isVirtualCol) {
1789  continue;
1790  }
1791  if (!fetchPhysicalColumns) {
1792  const auto& col_ti = cd->columnType;
1793  skip_physical_cols = col_ti.get_physical_cols();
1794  }
1795  columnDescriptors.push_back(cd);
1796  }
1797 }
1798 
1799 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
1800  const int tableId,
1801  const bool fetchSystemColumns,
1802  const bool fetchVirtualColumns,
1803  const bool fetchPhysicalColumns) const {
1804  cat_read_lock read_lock(this);
1806  tableId, fetchSystemColumns, fetchVirtualColumns, fetchPhysicalColumns);
1807 }
1808 
1809 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTableUnlocked(
1810  const int tableId,
1811  const bool fetchSystemColumns,
1812  const bool fetchVirtualColumns,
1813  const bool fetchPhysicalColumns) const {
1814  std::list<const ColumnDescriptor*> columnDescriptors;
1815  const TableDescriptor* td =
1816  getMetadataForTableImpl(tableId, false); // dont instantiate fragmenter
1818  columnDescriptors,
1819  fetchSystemColumns,
1820  fetchVirtualColumns,
1821  fetchPhysicalColumns);
1822  return columnDescriptors;
1823 }
1824 
1825 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
1826  cat_read_lock read_lock(this);
1827  list<const TableDescriptor*> table_list;
1828  for (auto p : tableDescriptorMapById_) {
1829  table_list.push_back(p.second);
1830  }
1831  return table_list;
1832 }
1833 
1834 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
1835  list<const DashboardDescriptor*> view_list;
1836  for (auto p : dashboardDescriptorMap_) {
1837  view_list.push_back(p.second.get());
1838  }
1839  return view_list;
1840 }
1841 
1843  const auto& td = *tableDescriptorMapById_[cd.tableId];
1844  list<DictDescriptor> dds;
1845  setColumnDictionary(cd, dds, td, true);
1846  auto& dd = dds.back();
1847  CHECK(dd.dictRef.dictId);
1848 
1849  std::unique_ptr<StringDictionaryClient> client;
1850  if (!string_dict_hosts_.empty()) {
1851  client.reset(new StringDictionaryClient(
1852  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
1853  }
1854  if (client) {
1855  client->create(dd.dictRef, dd.dictIsTemp);
1856  }
1857 
1858  DictDescriptor* new_dd = new DictDescriptor(dd);
1859  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
1860  if (!dd.dictIsTemp) {
1861  boost::filesystem::create_directory(new_dd->dictFolderPath);
1862  }
1863  return dd.dictRef;
1864 }
1865 
1867  cat_write_lock write_lock(this);
1869  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
1870  return;
1871  }
1872  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
1873  return;
1874  }
1875  const auto dictId = cd.columnType.get_comp_param();
1876  CHECK_GT(dictId, 0);
1877  // decrement and zero check dict ref count
1878  const auto td = getMetadataForTable(cd.tableId);
1879  CHECK(td);
1881  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
1882  std::to_string(dictId));
1884  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
1885  const auto refcount = sqliteConnector_.getData<int>(0, 0);
1886  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
1887  << refcount;
1888  if (refcount > 0) {
1889  return;
1890  }
1891  const DictRef dictRef(currentDB_.dbId, dictId);
1892  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
1893  std::to_string(dictId));
1894  File_Namespace::renameForDelete(g_base_path + "/mapd_data/DB_" +
1895  std::to_string(currentDB_.dbId) + "_DICT_" +
1896  std::to_string(dictId));
1897 
1898  std::unique_ptr<StringDictionaryClient> client;
1899  if (!string_dict_hosts_.empty()) {
1900  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
1901  }
1902  if (client) {
1903  client->drop(dictRef);
1904  }
1905 
1906  dictDescriptorMapByRef_.erase(dictRef);
1907 }
1908 
1910  std::map<int, StringDictionary*>& stringDicts) {
1911  // learn 'committed' ColumnDescriptor of this column
1912  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
1913  CHECK(cit != columnDescriptorMap_.end());
1914  auto& ccd = *cit->second;
1915 
1916  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
1917  return;
1918  }
1919  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
1920  return;
1921  }
1922  if (!(ccd.columnType.get_comp_param() > 0)) {
1923  return;
1924  }
1925 
1926  auto dictId = ccd.columnType.get_comp_param();
1927  getMetadataForDict(dictId);
1928 
1929  const DictRef dictRef(currentDB_.dbId, dictId);
1930  auto dit = dictDescriptorMapByRef_.find(dictRef);
1931  CHECK(dit != dictDescriptorMapByRef_.end());
1932  CHECK(dit->second);
1933  CHECK(dit->second.get()->stringDict);
1934  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
1935 }
1936 
1938  cat_write_lock write_lock(this);
1939  // caller must handle sqlite/chunk transaction TOGETHER
1940  cd.tableId = td.tableId;
1941  if (td.nShards > 0 && td.shard < 0) {
1942  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
1943  auto shard_cd = cd;
1944  addColumn(*shard, shard_cd);
1945  }
1946  }
1948  addDictionary(cd);
1949  }
1950 
1951  using BindType = SqliteConnector::BindType;
1952  std::vector<BindType> types(17, BindType::TEXT);
1953  if (!cd.default_value.has_value()) {
1954  types[16] = BindType::NULL_TYPE;
1955  }
1957  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
1958  "colscale, is_notnull, "
1959  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
1960  "is_deletedcol, default_value) "
1961  "VALUES (?, "
1962  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
1963  "?, ?, ?, "
1964  "?, "
1965  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1966  std::vector<std::string>{std::to_string(td.tableId),
1967  std::to_string(td.tableId),
1968  cd.columnName,
1977  "",
1980  cd.virtualExpr,
1982  cd.default_value.value_or("NULL")},
1983  types);
1984 
1986  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
1987  std::vector<std::string>{std::to_string(td.tableId)});
1988 
1990  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
1991  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
1992  cd.columnId = sqliteConnector_.getData<int>(0, 0);
1993 
1994  ++tableDescriptorMapById_[td.tableId]->nColumns;
1995  auto ncd = new ColumnDescriptor(cd);
1998  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
1999 }
2000 
2002  cat_write_lock write_lock(this);
2004  // caller must handle sqlite/chunk transaction TOGETHER
2006  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2007  std::vector<std::string>{std::to_string(td.tableId), std::to_string(cd.columnId)});
2008 
2010  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2011  std::vector<std::string>{std::to_string(td.tableId)});
2012 
2013  ColumnDescriptorMap::iterator columnDescIt =
2015  CHECK(columnDescIt != columnDescriptorMap_.end());
2016 
2017  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2018 
2019  columnDescriptorMap_.erase(columnDescIt);
2021  --tableDescriptorMapById_[td.tableId]->nColumns;
2022  // for each shard
2023  if (td.nShards > 0 && td.shard < 0) {
2024  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2025  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2026  CHECK(shard_cd);
2027  dropColumn(*shard, *shard_cd);
2028  }
2029  }
2030 }
2031 
2032 void Catalog::roll(const bool forward) {
2033  cat_write_lock write_lock(this);
2034  std::set<const TableDescriptor*> tds;
2035 
2036  for (const auto& cdr : columnDescriptorsForRoll) {
2037  auto ocd = cdr.first;
2038  auto ncd = cdr.second;
2039  CHECK(ocd || ncd);
2040  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2041  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2042  auto td = tabDescIt->second;
2043  auto& vc = td->columnIdBySpi_;
2044  if (forward) {
2045  if (ocd) {
2046  if (nullptr == ncd ||
2047  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2048  delDictionary(*ocd);
2049  }
2050 
2051  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2052 
2053  delete ocd;
2054  }
2055  if (ncd) {
2056  // append columnId if its new and not phy geo
2057  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2058  if (!ncd->isGeoPhyCol) {
2059  vc.push_back(ncd->columnId);
2060  }
2061  }
2062  }
2063  tds.insert(td);
2064  } else {
2065  if (ocd) {
2066  columnDescriptorMap_[ColumnKey(ocd->tableId, to_upper(ocd->columnName))] = ocd;
2067  columnDescriptorMapById_[ColumnIdKey(ocd->tableId, ocd->columnId)] = ocd;
2068  }
2069  // roll back the dict of new column
2070  if (ncd) {
2071  columnDescriptorMap_.erase(ColumnKey(ncd->tableId, to_upper(ncd->columnName)));
2072  columnDescriptorMapById_.erase(ColumnIdKey(ncd->tableId, ncd->columnId));
2073  if (nullptr == ocd ||
2074  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2075  delDictionary(*ncd);
2076  }
2077  delete ncd;
2078  }
2079  }
2080  }
2081  columnDescriptorsForRoll.clear();
2082 
2083  if (forward) {
2084  for (const auto td : tds) {
2085  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2086  }
2087  }
2088 }
2089 
2091  list<ColumnDescriptor>& columns) {
2092  const auto& col_ti = cd.columnType;
2093  if (IS_GEO(col_ti.get_type())) {
2094  switch (col_ti.get_type()) {
2095  case kPOINT: {
2096  ColumnDescriptor physical_cd_coords(true);
2097  physical_cd_coords.columnName = cd.columnName + "_coords";
2098  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2099  // Raw data: compressed/uncompressed coords
2100  coords_ti.set_subtype(kTINYINT);
2101  size_t unit_size;
2102  if (col_ti.get_compression() == kENCODING_GEOINT &&
2103  col_ti.get_comp_param() == 32) {
2104  unit_size = 4 * sizeof(int8_t);
2105  } else {
2106  CHECK(col_ti.get_compression() == kENCODING_NONE);
2107  unit_size = 8 * sizeof(int8_t);
2108  }
2109  coords_ti.set_size(2 * unit_size);
2110  physical_cd_coords.columnType = coords_ti;
2111  columns.push_back(physical_cd_coords);
2112 
2113  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2114 
2115  break;
2116  }
2117  case kLINESTRING: {
2118  ColumnDescriptor physical_cd_coords(true);
2119  physical_cd_coords.columnName = cd.columnName + "_coords";
2120  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2121  // Raw data: compressed/uncompressed coords
2122  coords_ti.set_subtype(kTINYINT);
2123  physical_cd_coords.columnType = coords_ti;
2124  columns.push_back(physical_cd_coords);
2125 
2126  ColumnDescriptor physical_cd_bounds(true);
2127  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2128  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2129  bounds_ti.set_subtype(kDOUBLE);
2130  bounds_ti.set_size(4 * sizeof(double));
2131  physical_cd_bounds.columnType = bounds_ti;
2132  columns.push_back(physical_cd_bounds);
2133 
2134  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2135 
2136  break;
2137  }
2138  case kPOLYGON: {
2139  ColumnDescriptor physical_cd_coords(true);
2140  physical_cd_coords.columnName = cd.columnName + "_coords";
2141  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2142  // Raw data: compressed/uncompressed coords
2143  coords_ti.set_subtype(kTINYINT);
2144  physical_cd_coords.columnType = coords_ti;
2145  columns.push_back(physical_cd_coords);
2146 
2147  ColumnDescriptor physical_cd_ring_sizes(true);
2148  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2149  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2150  ring_sizes_ti.set_subtype(kINT);
2151  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2152  columns.push_back(physical_cd_ring_sizes);
2153 
2154  ColumnDescriptor physical_cd_bounds(true);
2155  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2156  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2157  bounds_ti.set_subtype(kDOUBLE);
2158  bounds_ti.set_size(4 * sizeof(double));
2159  physical_cd_bounds.columnType = bounds_ti;
2160  columns.push_back(physical_cd_bounds);
2161 
2162  ColumnDescriptor physical_cd_render_group(true);
2163  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2164  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2165  physical_cd_render_group.columnType = render_group_ti;
2166  columns.push_back(physical_cd_render_group);
2167 
2168  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2169 
2170  break;
2171  }
2172  case kMULTIPOLYGON: {
2173  ColumnDescriptor physical_cd_coords(true);
2174  physical_cd_coords.columnName = cd.columnName + "_coords";
2175  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2176  // Raw data: compressed/uncompressed coords
2177  coords_ti.set_subtype(kTINYINT);
2178  physical_cd_coords.columnType = coords_ti;
2179  columns.push_back(physical_cd_coords);
2180 
2181  ColumnDescriptor physical_cd_ring_sizes(true);
2182  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2183  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2184  ring_sizes_ti.set_subtype(kINT);
2185  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2186  columns.push_back(physical_cd_ring_sizes);
2187 
2188  ColumnDescriptor physical_cd_poly_rings(true);
2189  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2190  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2191  poly_rings_ti.set_subtype(kINT);
2192  physical_cd_poly_rings.columnType = poly_rings_ti;
2193  columns.push_back(physical_cd_poly_rings);
2194 
2195  ColumnDescriptor physical_cd_bounds(true);
2196  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2197  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2198  bounds_ti.set_subtype(kDOUBLE);
2199  bounds_ti.set_size(4 * sizeof(double));
2200  physical_cd_bounds.columnType = bounds_ti;
2201  columns.push_back(physical_cd_bounds);
2202 
2203  ColumnDescriptor physical_cd_render_group(true);
2204  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2205  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2206  physical_cd_render_group.columnType = render_group_ti;
2207  columns.push_back(physical_cd_render_group);
2208 
2209  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2210 
2211  break;
2212  }
2213  default:
2214  throw runtime_error("Unrecognized geometry type.");
2215  break;
2216  }
2217  }
2218 }
2219 
2220 namespace {
2222  auto timing_type_entry =
2224  CHECK(timing_type_entry != foreign_table.options.end());
2225  if (timing_type_entry->second ==
2227  return foreign_storage::get_next_refresh_time(foreign_table.options);
2228  }
2230 }
2231 } // namespace
2232 
2234  TableDescriptor& td,
2235  const list<ColumnDescriptor>& cols,
2236  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2237  bool isLogicalTable) {
2238  cat_write_lock write_lock(this);
2239  list<ColumnDescriptor> cds = cols;
2240  list<DictDescriptor> dds;
2241  std::set<std::string> toplevel_column_names;
2242  list<ColumnDescriptor> columns;
2243 
2244  if (!td.storageType.empty() &&
2247  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2248  }
2249  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2250  }
2251 
2252  for (auto cd : cds) {
2253  if (cd.columnName == "rowid") {
2254  throw std::runtime_error(
2255  "Cannot create column with name rowid. rowid is a system defined column.");
2256  }
2257  columns.push_back(cd);
2258  toplevel_column_names.insert(cd.columnName);
2259  if (cd.columnType.is_geometry()) {
2260  expandGeoColumn(cd, columns);
2261  }
2262  }
2263  cds.clear();
2264 
2265  ColumnDescriptor cd;
2266  // add row_id column -- Must be last column in the table
2267  cd.columnName = "rowid";
2268  cd.isSystemCol = true;
2269  cd.columnType = SQLTypeInfo(kBIGINT, true);
2270 #ifdef MATERIALIZED_ROWID
2271  cd.isVirtualCol = false;
2272 #else
2273  cd.isVirtualCol = true;
2274  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2275 #endif
2276  columns.push_back(cd);
2277  toplevel_column_names.insert(cd.columnName);
2278 
2279  if (td.hasDeletedCol) {
2280  ColumnDescriptor cd_del;
2281  cd_del.columnName = "$deleted$";
2282  cd_del.isSystemCol = true;
2283  cd_del.isVirtualCol = false;
2284  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2285  cd_del.isDeletedCol = true;
2286 
2287  columns.push_back(cd_del);
2288  }
2289 
2290  td.nColumns = columns.size();
2292  sqliteConnector_.query("BEGIN TRANSACTION");
2294  try {
2296  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2297  std::vector<std::string>{td.tableName,
2298  std::to_string(td.userId),
2300  std::to_string(td.isView),
2301  "",
2306  std::to_string(td.maxRows),
2307  td.partitions,
2309  std::to_string(td.shard),
2310  std::to_string(td.nShards),
2312  td.storageType,
2314  td.keyMetainfo});
2315 
2316  // now get the auto generated tableid
2318  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2319  td.tableId = sqliteConnector_.getData<int>(0, 0);
2320  int colId = 1;
2321  for (auto cd : columns) {
2323  const bool is_foreign_col =
2324  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2325  if (!is_foreign_col) {
2326  setColumnDictionary(cd, dds, td, isLogicalTable);
2327  }
2328  }
2329 
2330  if (toplevel_column_names.count(cd.columnName)) {
2331  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2332  if (colId > 1) {
2333  colId += g_test_against_columnId_gap;
2334  }
2335  if (!cd.isGeoPhyCol) {
2336  td.columnIdBySpi_.push_back(colId);
2337  }
2338  }
2339 
2340  using BindType = SqliteConnector::BindType;
2341  std::vector<BindType> types(17, BindType::TEXT);
2342  if (!cd.default_value.has_value()) {
2343  types[16] = BindType::NULL_TYPE;
2344  }
2346  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2347  "coldim, colscale, is_notnull, "
2348  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2349  "virtual_expr, is_deletedcol, default_value) "
2350  "VALUES (?, ?, ?, ?, ?, "
2351  "?, "
2352  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2353  std::vector<std::string>{std::to_string(td.tableId),
2354  std::to_string(colId),
2355  cd.columnName,
2364  "",
2367  cd.virtualExpr,
2369  cd.default_value.value_or("NULL")},
2370  types);
2371  cd.tableId = td.tableId;
2372  cd.columnId = colId++;
2373  cds.push_back(cd);
2374  }
2375  if (td.isView) {
2377  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2378  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2379  }
2381  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2382  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2384  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2385  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2386  std::vector<std::string>{std::to_string(foreign_table.tableId),
2387  std::to_string(foreign_table.foreign_server->id),
2388  foreign_table.getOptionsAsJsonString(),
2389  std::to_string(foreign_table.last_refresh_time),
2390  std::to_string(foreign_table.next_refresh_time)});
2391  }
2392  } catch (std::exception& e) {
2393  sqliteConnector_.query("ROLLBACK TRANSACTION");
2394  throw;
2395  }
2396  } else { // Temporary table
2397  td.tableId = nextTempTableId_++;
2398  int colId = 1;
2399  for (auto cd : columns) {
2401  const bool is_foreign_col =
2402  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2403 
2404  if (!is_foreign_col) {
2405  // Create a new temporary dictionary
2406  std::string fileName("");
2407  std::string folderPath("");
2409  nextTempDictId_++;
2410  DictDescriptor dd(dict_ref,
2411  fileName,
2413  false,
2414  1,
2415  folderPath,
2416  true); // Is dictName (2nd argument) used?
2417  dds.push_back(dd);
2418  if (!cd.columnType.is_array()) {
2420  }
2421  cd.columnType.set_comp_param(dict_ref.dictId);
2422  }
2423  }
2424  if (toplevel_column_names.count(cd.columnName)) {
2425  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2426  if (colId > 1) {
2427  colId += g_test_against_columnId_gap;
2428  }
2429  if (!cd.isGeoPhyCol) {
2430  td.columnIdBySpi_.push_back(colId);
2431  }
2432  }
2433  cd.tableId = td.tableId;
2434  cd.columnId = colId++;
2435  cds.push_back(cd);
2436  }
2437 
2439  serializeTableJsonUnlocked(&td, cds);
2440  }
2441  }
2442 
2443  try {
2444  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2445  if (cache) {
2446  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2447  << "Disk cache at " + cache->getCacheDirectory()
2448  << " contains preexisting data for new table. Please "
2449  "delete or clear cache before continuing";
2450  }
2451 
2452  addTableToMap(&td, cds, dds);
2453  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2454  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2455  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2456  }
2457  } catch (std::exception& e) {
2458  sqliteConnector_.query("ROLLBACK TRANSACTION");
2459  removeTableFromMap(td.tableName, td.tableId, true);
2460  throw;
2461  }
2462  sqliteConnector_.query("END TRANSACTION");
2463 
2465  sqlite_lock.unlock();
2466  getMetadataForTable(td.tableName,
2467  true); // cause instantiateFragmenter() to be called
2468  }
2469 }
2470 
2471 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2472  const std::list<ColumnDescriptor>& cds) const {
2473  // relies on the catalog write lock
2474  using namespace rapidjson;
2475 
2476  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2477 
2478  const auto db_name = currentDB_.dbName;
2479  const auto file_path = table_json_filepath(basePath_, db_name);
2480 
2481  Document d;
2482  if (boost::filesystem::exists(file_path)) {
2483  // look for an existing file for this database
2484  std::ifstream reader(file_path.string());
2485  CHECK(reader.is_open());
2486  IStreamWrapper json_read_wrapper(reader);
2487  d.ParseStream(json_read_wrapper);
2488  } else {
2489  d.SetObject();
2490  }
2491  CHECK(d.IsObject());
2492  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2493 
2494  Value table(kObjectType);
2495  table.AddMember(
2496  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2497  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2498  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2499 
2500  for (const auto& cd : cds) {
2501  Value column(kObjectType);
2502  column.AddMember(
2503  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2504  column.AddMember("coltype",
2505  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2506  d.GetAllocator());
2507  column.AddMember("colsubtype",
2508  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2509  d.GetAllocator());
2510  column.AddMember(
2511  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2512  column.AddMember(
2513  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2514  column.AddMember(
2515  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2516  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2517  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2518  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2519  table["columns"].PushBack(column, d.GetAllocator());
2520  }
2521  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2522 
2523  // Overwrite the existing file
2524  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2525  CHECK(writer.is_open());
2526  OStreamWrapper json_wrapper(writer);
2527 
2528  Writer<OStreamWrapper> json_writer(json_wrapper);
2529  d.Accept(json_writer);
2530  writer.close();
2531 }
2532 
2533 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2534  // relies on the catalog write lock
2535  using namespace rapidjson;
2536 
2537  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2538 
2539  const auto db_name = currentDB_.dbName;
2540  const auto file_path = table_json_filepath(basePath_, db_name);
2541 
2542  CHECK(boost::filesystem::exists(file_path));
2543  Document d;
2544 
2545  std::ifstream reader(file_path.string());
2546  CHECK(reader.is_open());
2547  IStreamWrapper json_read_wrapper(reader);
2548  d.ParseStream(json_read_wrapper);
2549 
2550  CHECK(d.IsObject());
2551  auto table_name_ref = StringRef(table_name.c_str());
2552  CHECK(d.HasMember(table_name_ref));
2553  CHECK(d.RemoveMember(table_name_ref));
2554 
2555  // Overwrite the existing file
2556  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2557  CHECK(writer.is_open());
2558  OStreamWrapper json_wrapper(writer);
2559 
2560  Writer<OStreamWrapper> json_writer(json_wrapper);
2561  d.Accept(json_writer);
2562  writer.close();
2563 }
2564 
2565 void Catalog::createForeignServer(
2566  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2567  bool if_not_exists) {
2568  cat_write_lock write_lock(this);
2569  cat_sqlite_lock sqlite_lock(getObjForLock());
2570  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
2571 }
2572 
2573 void Catalog::createForeignServerNoLocks(
2574  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2575  bool if_not_exists) {
2576  sqliteConnector_.query_with_text_params(
2577  "SELECT name from omnisci_foreign_servers where name = ?",
2578  std::vector<std::string>{foreign_server->name});
2579 
2580  if (sqliteConnector_.getNumRows() == 0) {
2581  foreign_server->creation_time = std::time(nullptr);
2582  sqliteConnector_.query_with_text_params(
2583  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
2584  "creation_time, "
2585  "options) "
2586  "VALUES (?, ?, ?, ?, ?)",
2587  std::vector<std::string>{foreign_server->name,
2588  foreign_server->data_wrapper_type,
2589  std::to_string(foreign_server->user_id),
2590  std::to_string(foreign_server->creation_time),
2591  foreign_server->getOptionsAsJsonString()});
2592  sqliteConnector_.query_with_text_params(
2593  "SELECT id from omnisci_foreign_servers where name = ?",
2594  std::vector<std::string>{foreign_server->name});
2595  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
2596  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
2597  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
2598  std::move(foreign_server);
2599  CHECK(foreignServerMap_.find(foreign_server_shared->name) == foreignServerMap_.end())
2600  << "Attempting to insert a foreign server into foreign server map that already "
2601  "exists.";
2602  foreignServerMap_[foreign_server_shared->name] = foreign_server_shared;
2603  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
2604  } else if (!if_not_exists) {
2605  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
2606  "\" already exists."};
2607  }
2608 }
2609 
2610 const foreign_storage::ForeignServer* Catalog::getForeignServer(
2611  const std::string& server_name) const {
2612  foreign_storage::ForeignServer* foreign_server = nullptr;
2613  cat_read_lock read_lock(this);
2614 
2615  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
2616  foreign_server = foreignServerMap_.find(server_name)->second.get();
2617  }
2618  return foreign_server;
2619 }
2620 
2621 const std::unique_ptr<const foreign_storage::ForeignServer>
2622 Catalog::getForeignServerFromStorage(const std::string& server_name) {
2623  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
2624  cat_sqlite_lock sqlite_lock(getObjForLock());
2625  sqliteConnector_.query_with_text_params(
2626  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
2627  "FROM omnisci_foreign_servers WHERE name = ?",
2628  std::vector<std::string>{server_name});
2629  if (sqliteConnector_.getNumRows() > 0) {
2630  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
2631  sqliteConnector_.getData<int>(0, 0),
2632  sqliteConnector_.getData<std::string>(0, 1),
2633  sqliteConnector_.getData<std::string>(0, 2),
2634  sqliteConnector_.getData<std::string>(0, 3),
2635  sqliteConnector_.getData<std::int32_t>(0, 4),
2636  sqliteConnector_.getData<std::int32_t>(0, 5));
2637  }
2638  return foreign_server;
2639 }
2640 
2641 const std::unique_ptr<const foreign_storage::ForeignTable>
2642 Catalog::getForeignTableFromStorage(int table_id) {
2643  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
2644  cat_sqlite_lock sqlite_lock(getObjForLock());
2645  sqliteConnector_.query_with_text_params(
2646  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
2647  "omnisci_foreign_tables WHERE table_id = ?",
2648  std::vector<std::string>{to_string(table_id)});
2649  auto num_rows = sqliteConnector_.getNumRows();
2650  if (num_rows > 0) {
2651  CHECK_EQ(size_t(1), num_rows);
2652  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
2653  sqliteConnector_.getData<int>(0, 0),
2654  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
2655  sqliteConnector_.getData<std::string>(0, 2),
2656  sqliteConnector_.getData<int>(0, 3),
2657  sqliteConnector_.getData<int>(0, 4));
2658  }
2659  return foreign_table;
2660 }
2661 
2662 void Catalog::changeForeignServerOwner(const std::string& server_name,
2663  const int new_owner_id) {
2664  cat_write_lock write_lock(this);
2665  foreign_storage::ForeignServer* foreign_server =
2666  foreignServerMap_.find(server_name)->second.get();
2667  CHECK(foreign_server);
2668  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
2669  // update in-memory server
2670  foreign_server->user_id = new_owner_id;
2671 }
2672 
2673 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
2674  const std::string& data_wrapper) {
2675  cat_write_lock write_lock(this);
2676  auto data_wrapper_type = to_upper(data_wrapper);
2677  // update in-memory server
2678  foreign_storage::ForeignServer* foreign_server =
2679  foreignServerMap_.find(server_name)->second.get();
2680  CHECK(foreign_server);
2681  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
2682  foreign_server->data_wrapper_type = data_wrapper_type;
2683  try {
2684  foreign_server->validate();
2685  } catch (const std::exception& e) {
2686  // validation did not succeed:
2687  // revert to saved data_wrapper_type & throw exception
2688  foreign_server->data_wrapper_type = saved_data_wrapper_type;
2689  throw;
2690  }
2691  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
2692 }
2693 
2694 void Catalog::setForeignServerOptions(const std::string& server_name,
2695  const std::string& options) {
2696  cat_write_lock write_lock(this);
2697  // update in-memory server
2698  foreign_storage::ForeignServer* foreign_server =
2699  foreignServerMap_.find(server_name)->second.get();
2700  CHECK(foreign_server);
2701  auto saved_options = foreign_server->options;
2702  foreign_server->populateOptionsMap(options, true);
2703  try {
2704  foreign_server->validate();
2705  } catch (const std::exception& e) {
2706  // validation did not succeed:
2707  // revert to saved options & throw exception
2708  foreign_server->options = saved_options;
2709  throw;
2710  }
2711  setForeignServerProperty(server_name, "options", options);
2712 }
2713 
2714 void Catalog::renameForeignServer(const std::string& server_name,
2715  const std::string& name) {
2716  cat_write_lock write_lock(this);
2717  auto foreign_server_it = foreignServerMap_.find(server_name);
2718  CHECK(foreign_server_it != foreignServerMap_.end());
2719  setForeignServerProperty(server_name, "name", name);
2720  auto foreign_server_shared = foreign_server_it->second;
2721  foreign_server_shared->name = name;
2722  foreignServerMap_[name] = foreign_server_shared;
2723  foreignServerMap_.erase(foreign_server_it);
2724 }
2725 
2726 void Catalog::dropForeignServer(const std::string& server_name) {
2727  cat_write_lock write_lock(this);
2728  cat_sqlite_lock sqlite_lock(getObjForLock());
2729 
2730  sqliteConnector_.query_with_text_params(
2731  "SELECT id from omnisci_foreign_servers where name = ?",
2732  std::vector<std::string>{server_name});
2733  auto num_rows = sqliteConnector_.getNumRows();
2734  if (num_rows > 0) {
2735  CHECK_EQ(size_t(1), num_rows);
2736  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
2737  sqliteConnector_.query_with_text_param(
2738  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
2739  std::to_string(server_id));
2740  if (sqliteConnector_.getNumRows() > 0) {
2741  throw std::runtime_error{"Foreign server \"" + server_name +
2742  "\" is referenced "
2743  "by existing foreign tables and cannot be dropped."};
2744  }
2745  sqliteConnector_.query("BEGIN TRANSACTION");
2746  try {
2747  sqliteConnector_.query_with_text_params(
2748  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
2749  std::vector<std::string>{server_name});
2750  } catch (const std::exception& e) {
2751  sqliteConnector_.query("ROLLBACK TRANSACTION");
2752  throw;
2753  }
2754  sqliteConnector_.query("END TRANSACTION");
2755  foreignServerMap_.erase(server_name);
2756  foreignServerMapById_.erase(server_id);
2757  }
2758 }
2759 
2760 void Catalog::getForeignServersForUser(
2761  const rapidjson::Value* filters,
2762  const UserMetadata& user,
2763  std::vector<const foreign_storage::ForeignServer*>& results) {
2764  sys_read_lock syscat_read_lock(&SysCatalog::instance());
2765  cat_read_lock read_lock(this);
2766  cat_sqlite_lock sqlite_lock(getObjForLock());
2767  // Customer facing and internal SQlite names
2768  std::map<std::string, std::string> col_names{{"server_name", "name"},
2769  {"data_wrapper", "data_wrapper_type"},
2770  {"created_at", "creation_time"},
2771  {"options", "options"}};
2772 
2773  // TODO add "owner" when FSI privilege is implemented
2774  std::stringstream filter_string;
2775  std::vector<std::string> arguments;
2776 
2777  if (filters != nullptr) {
2778  // Create SQL WHERE clause for SQLite query
2779  int num_filters = 0;
2780  filter_string << " WHERE";
2781  for (auto& filter_def : filters->GetArray()) {
2782  if (num_filters > 0) {
2783  filter_string << " " << std::string(filter_def["chain"].GetString());
2784  ;
2785  }
2786 
2787  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
2788  col_names.end()) {
2789  throw std::runtime_error{"Attribute with name \"" +
2790  std::string(filter_def["attribute"].GetString()) +
2791  "\" does not exist."};
2792  }
2793 
2794  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
2795 
2796  bool equals_operator = false;
2797  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
2798  filter_string << " = ? ";
2799  equals_operator = true;
2800  } else {
2801  filter_string << " LIKE ? ";
2802  }
2803 
2804  bool timestamp_column =
2805  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
2806 
2807  if (timestamp_column && !equals_operator) {
2808  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
2809  }
2810 
2811  if (timestamp_column && equals_operator) {
2812  arguments.push_back(std::to_string(
2813  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
2814  } else {
2815  arguments.push_back(filter_def["value"].GetString());
2816  }
2817 
2818  num_filters++;
2819  }
2820  }
2821  // Create select query for the omnisci_foreign_servers table
2822  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
2823  query += filter_string.str();
2824 
2825  sqliteConnector_.query_with_text_params(query, arguments);
2826  auto num_rows = sqliteConnector_.getNumRows();
2827 
2828  if (sqliteConnector_.getNumRows() == 0)
2829  return;
2830 
2831  CHECK(sqliteConnector_.getNumCols() == 1);
2832  // Return pointers to objects
2833  results.reserve(num_rows);
2834  for (size_t row = 0; row < num_rows; ++row) {
2835  const foreign_storage::ForeignServer* foreign_server =
2836  getForeignServer(sqliteConnector_.getData<std::string>(row, 0));
2837 
2838  CHECK(foreign_server != nullptr);
2839 
2840  DBObject dbObject(foreign_server->name, ServerDBObjectType);
2841  dbObject.loadKey(*this);
2842  std::vector<DBObject> privObjects = {dbObject};
2843  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
2844  // skip server, as there are no privileges to access it
2845  continue;
2846  }
2847  results.push_back(foreign_server);
2848  }
2849 }
2850 
2851 // returns the table epoch or -1 if there is something wrong with the shared epoch
2852 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
2853  cat_read_lock read_lock(this);
2854  const auto td = getMetadataForTable(table_id, false);
2855  if (!td) {
2856  std::stringstream table_not_found_error_message;
2857  table_not_found_error_message << "Table (" << db_id << "," << table_id
2858  << ") not found";
2859  throw std::runtime_error(table_not_found_error_message.str());
2860  }
2861  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2862  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2863  // check all shards have same checkpoint
2864  const auto physicalTables = physicalTableIt->second;
2865  CHECK(!physicalTables.empty());
2866  size_t curr_epoch{0}, first_epoch{0};
2867  int32_t first_table_id{0};
2868  bool are_epochs_inconsistent{false};
2869  for (size_t i = 0; i < physicalTables.size(); i++) {
2870  int32_t physical_tb_id = physicalTables[i];
2871  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2872  CHECK(phys_td);
2873 
2874  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
2875  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
2876  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
2877  if (i == 0) {
2878  first_epoch = curr_epoch;
2879  first_table_id = physical_tb_id;
2880  } else if (first_epoch != curr_epoch) {
2881  are_epochs_inconsistent = true;
2882  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
2883  << ", db id: " << db_id
2884  << ". First table (table id: " << first_table_id
2885  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
2886  << ", has inconsistent epoch: " << curr_epoch
2887  << ". See previous INFO logs for all epochs and their table ids.";
2888  }
2889  }
2890  if (are_epochs_inconsistent) {
2891  // oh dear the shards do not agree on the epoch for this table
2892  return -1;
2893  }
2894  return curr_epoch;
2895  } else {
2896  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
2897  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
2898  << ", epoch: " << epoch;
2899  return epoch;
2900  }
2901 }
2902 
2903 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
2904  cat_read_lock read_lock(this);
2905  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
2906  << " back to new epoch " << new_epoch;
2907  const auto td = getMetadataForTable(table_id, false);
2908  if (!td) {
2909  std::stringstream table_not_found_error_message;
2910  table_not_found_error_message << "Table (" << db_id << "," << table_id
2911  << ") not found";
2912  throw std::runtime_error(table_not_found_error_message.str());
2913  }
2915  std::stringstream is_temp_table_error_message;
2916  is_temp_table_error_message << "Cannot set epoch on temporary table";
2917  throw std::runtime_error(is_temp_table_error_message.str());
2918  }
2919  File_Namespace::FileMgrParams file_mgr_params;
2920  file_mgr_params.epoch = new_epoch;
2921  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
2922 
2923  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2924  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2925  const auto physicalTables = physicalTableIt->second;
2926  CHECK(!physicalTables.empty());
2927  for (size_t i = 0; i < physicalTables.size(); i++) {
2928  const int32_t physical_tb_id = physicalTables[i];
2929  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2930  CHECK(phys_td);
2931  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID "
2932  << physical_tb_id << " back to new epoch " << new_epoch;
2933  // Should have table lock from caller so safe to do this after, avoids
2934  // having to repopulate data on error
2935  removeChunksUnlocked(physical_tb_id);
2936  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
2937  db_id, physical_tb_id, file_mgr_params);
2938  }
2939  } else { // not shared
2940  // Should have table lock from caller so safe to do this after, avoids
2941  // having to repopulate data on error
2942  removeChunksUnlocked(table_id);
2943  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
2944  }
2945 }
2946 
2947 void Catalog::alterPhysicalTableMetadata(
2948  const TableDescriptor* td,
2949  const TableDescriptorUpdateParams& table_update_params) {
2950  // Only called from parent alterTableParamMetadata, expect already to have catalog and
2951  // sqlite write locks
2952 
2953  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
2954 
2955  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
2956  CHECK(mutable_td);
2957  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
2958  sqliteConnector_.query_with_text_params(
2959  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
2960  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
2961  std::to_string(td->tableId)});
2962  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
2963  }
2964 
2965  if (td->maxRows != table_update_params.max_rows) {
2966  sqliteConnector_.query_with_text_params(
2967  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
2968  std::vector<std::string>{std::to_string(table_update_params.max_rows),
2969  std::to_string(td->tableId)});
2970  mutable_td->maxRows = table_update_params.max_rows;
2971  }
2972 }
2973 
2974 void Catalog::alterTableMetadata(const TableDescriptor* td,
2975  const TableDescriptorUpdateParams& table_update_params) {
2976  cat_write_lock write_lock(this);
2977  cat_sqlite_lock sqlite_lock(getObjForLock());
2978  sqliteConnector_.query("BEGIN TRANSACTION");
2979  try {
2980  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
2981  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
2982  const auto physical_tables = physical_table_it->second;
2983  CHECK(!physical_tables.empty());
2984  for (size_t i = 0; i < physical_tables.size(); i++) {
2985  int32_t physical_tb_id = physical_tables[i];
2986  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2987  CHECK(phys_td);
2988  alterPhysicalTableMetadata(phys_td, table_update_params);
2989  }
2990  }
2991  alterPhysicalTableMetadata(td, table_update_params);
2992  } catch (std::exception& e) {
2993  sqliteConnector_.query("ROLLBACK TRANSACTION");
2994  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
2995  }
2996  sqliteConnector_.query("END TRANSACTION");
2997 }
2998 
2999 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3000  const int32_t max_rollback_epochs) {
3001  // Must be called from AlterTableParamStmt or other method that takes executor and
3002  // TableSchema locks
3003  cat_write_lock write_lock(this); // Consider only taking read lock for potentially
3004  // heavy table storage metadata rolloff operations
3005  if (max_rollback_epochs <= -1) {
3006  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3007  }
3008  const auto td = getMetadataForTable(
3009  table_id, false); // Deep copy as there will be gap between read and write locks
3010  CHECK(td); // Existence should have already been checked in
3011  // ParserNode::AlterTableParmStmt
3012  TableDescriptorUpdateParams table_update_params(td);
3013  table_update_params.max_rollback_epochs = max_rollback_epochs;
3014  if (table_update_params == td) { // Operator is overloaded to test for equality
3015  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3016  << " to existing value, skipping operation";
3017  return;
3018  }
3019  File_Namespace::FileMgrParams file_mgr_params;
3020  file_mgr_params.epoch = -1; // Use existing epoch
3021  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3022  setTableFileMgrParams(table_id, file_mgr_params);
3023  // Unlock as alterTableCatalogMetadata will take write lock, and Catalog locks are not
3024  // upgradeable Should be safe as we have schema lock on this table
3026  alterTableMetadata(td, table_update_params);
3027 }
3028 
3029 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3030  if (max_rows < 0) {
3031  throw std::runtime_error("Max rows cannot be a negative number.");
3032  }
3033  const auto td = getMetadataForTable(table_id);
3034  CHECK(td);
3035  TableDescriptorUpdateParams table_update_params(td);
3036  table_update_params.max_rows = max_rows;
3037  if (table_update_params == td) {
3038  LOG(INFO) << "Max rows value of " << max_rows
3039  << " is the same as the existing value. Skipping update.";
3040  return;
3041  }
3042  alterTableMetadata(td, table_update_params);
3043  CHECK(td->fragmenter);
3044  td->fragmenter->dropFragmentsToSize(max_rows);
3045 }
3046 
3047 // For testing purposes only
3048 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3049  cat_write_lock write_lock(this);
3050  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3051  CHECK(td_entry != tableDescriptorMap_.end());
3052  auto td = td_entry->second;
3053  TableDescriptorUpdateParams table_update_params(td);
3054  table_update_params.max_rollback_epochs = -1;
3055  alterTableMetadata(td, table_update_params);
3056 
3057  File_Namespace::FileMgrParams file_mgr_params;
3058  file_mgr_params.max_rollback_epochs = -1;
3059  setTableFileMgrParams(td->tableId, file_mgr_params);
3060 }
3061 
3062 void Catalog::setTableFileMgrParams(
3063  const int table_id,
3064  const File_Namespace::FileMgrParams& file_mgr_params) {
3065  // Expects parent to have write lock
3066  const auto td = getMetadataForTable(table_id, false);
3067  const auto db_id = this->getDatabaseId();
3068  if (!td) {
3069  std::stringstream table_not_found_error_message;
3070  table_not_found_error_message << "Table (" << db_id << "," << table_id
3071  << ") not found";
3072  throw std::runtime_error(table_not_found_error_message.str());
3073  }
3075  std::stringstream is_temp_table_error_message;
3076  is_temp_table_error_message << "Cannot set storage params on temporary table";
3077  throw std::runtime_error(is_temp_table_error_message.str());
3078  }
3079  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3080  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3081  const auto physical_tables = physical_table_it->second;
3082  CHECK(!physical_tables.empty());
3083  for (size_t i = 0; i < physical_tables.size(); i++) {
3084  const int32_t physical_tb_id = physical_tables[i];
3085  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3086  CHECK(phys_td);
3087  removeChunksUnlocked(physical_tb_id);
3088  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3089  db_id, physical_tb_id, file_mgr_params);
3090  }
3091  } else { // not shared
3092  removeChunksUnlocked(table_id);
3093  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3094  }
3095 }
3096 
3097 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3098  const int32_t table_id) const {
3099  cat_read_lock read_lock(this);
3100  std::vector<TableEpochInfo> table_epochs;
3101  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3102  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3103  const auto physical_tables = physical_table_it->second;
3104  CHECK(!physical_tables.empty());
3105 
3106  for (const auto physical_tb_id : physical_tables) {
3107  const auto phys_td = getMetadataForTableImpl(physical_tb_id, false);
3108  CHECK(phys_td);
3109 
3110  auto table_id = phys_td->tableId;
3111  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3112  table_epochs.emplace_back(table_id, epoch);
3113  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3114  << ", table id: " << table_id << ", epoch: " << epoch;
3115  }
3116  } else {
3117  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3118  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3119  << ", epoch: " << epoch;
3120  table_epochs.emplace_back(table_id, epoch);
3121  }
3122  return table_epochs;
3123 }
3124 
3125 void Catalog::setTableEpochs(const int32_t db_id,
3126  const std::vector<TableEpochInfo>& table_epochs) const {
3127  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3128  CHECK(td);
3129  File_Namespace::FileMgrParams file_mgr_params;
3130  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3131 
3132  cat_read_lock read_lock(this);
3133  for (const auto& table_epoch_info : table_epochs) {
3134  removeChunksUnlocked(table_epoch_info.table_id);
3135  file_mgr_params.epoch = table_epoch_info.table_epoch;
3136  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3137  db_id, table_epoch_info.table_id, file_mgr_params);
3138  LOG(INFO) << "Set table epoch for db id: " << db_id
3139  << ", table id: " << table_epoch_info.table_id
3140  << ", back to epoch: " << table_epoch_info.table_epoch;
3141  }
3142 }
3143 
3144 namespace {
3145 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3146  std::string table_epochs_str{"["};
3147  bool first_entry{true};
3148  for (const auto& table_epoch : table_epochs) {
3149  if (first_entry) {
3150  first_entry = false;
3151  } else {
3152  table_epochs_str += ", ";
3153  }
3154  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3155  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3156  }
3157  table_epochs_str += "]";
3158  return table_epochs_str;
3159 }
3160 } // namespace
3161 
3162 void Catalog::setTableEpochsLogExceptions(
3163  const int32_t db_id,
3164  const std::vector<TableEpochInfo>& table_epochs) const {
3165  try {
3166  setTableEpochs(db_id, table_epochs);
3167  } catch (std::exception& e) {
3168  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3169  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3170  << ", Error: " << e.what();
3171  }
3172 }
3173 
3174 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3175  cat_read_lock read_lock(this);
3176  const auto it = deletedColumnPerTable_.find(td);
3177  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3178 }
3179 
3180 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3181  int delete_column_id) const {
3182  // check if there are rows deleted by examining the deletedColumn metadata
3183  CHECK(td);
3184  auto fragmenter = td->fragmenter;
3185  if (fragmenter) {
3186  return fragmenter->hasDeletedRows(delete_column_id);
3187  } else {
3188  return false;
3189  }
3190 }
3191 
3192 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3193  const TableDescriptor* td) const {
3194  std::vector<const TableDescriptor*> tds;
3195  const ColumnDescriptor* cd;
3196  {
3197  cat_read_lock read_lock(this);
3198 
3199  const auto it = deletedColumnPerTable_.find(td);
3200  // if not a table that supports delete return nullptr, nothing more to do
3201  if (it == deletedColumnPerTable_.end()) {
3202  return nullptr;
3203  }
3204  cd = it->second;
3205  tds = getPhysicalTablesDescriptors(td, false);
3206  }
3207  // individual tables are still protected by higher level locks
3208  for (auto tdd : tds) {
3209  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3210  return cd;
3211  }
3212  }
3213  // no deletes so far recorded in metadata
3214  return nullptr;
3215 }
3216 
3217 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3218  cat_write_lock write_lock(this);
3219  setDeletedColumnUnlocked(td, cd);
3220 }
3221 
3222 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3223  const ColumnDescriptor* cd) {
3224  cat_write_lock write_lock(this);
3225  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3226  CHECK(it_ok.second);
3227 }
3228 
3229 namespace {
3230 
3232  const Catalog& cat,
3233  const Parser::SharedDictionaryDef& shared_dict_def) {
3234  const auto& table_name = shared_dict_def.get_foreign_table();
3235  const auto td = cat.getMetadataForTable(table_name);
3236  CHECK(td);
3237  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3238  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3239 }
3240 
3241 } // namespace
3242 
3243 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3244  Parser::SharedDictionaryDef shared_dict_def,
3245  const bool persist_reference) {
3246  cat_write_lock write_lock(this);
3247  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3248  CHECK(foreign_ref_col);
3249  referencing_column.columnType = foreign_ref_col->columnType;
3250  const int dict_id = referencing_column.columnType.get_comp_param();
3251  const DictRef dict_ref(currentDB_.dbId, dict_id);
3252  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3253  CHECK(dictIt != dictDescriptorMapByRef_.end());
3254  const auto& dd = dictIt->second;
3255  CHECK_GE(dd->refcount, 1);
3256  ++dd->refcount;
3257  if (persist_reference) {
3258  cat_sqlite_lock sqlite_lock(getObjForLock());
3259  sqliteConnector_.query_with_text_params(
3260  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3261  {std::to_string(dict_id)});
3262  }
3263 }
3264 
3265 bool Catalog::setColumnSharedDictionary(
3266  ColumnDescriptor& cd,
3267  std::list<ColumnDescriptor>& cdd,
3268  std::list<DictDescriptor>& dds,
3269  const TableDescriptor td,
3270  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3271  cat_write_lock write_lock(this);
3272  cat_sqlite_lock sqlite_lock(getObjForLock());
3273 
3274  if (shared_dict_defs.empty()) {
3275  return false;
3276  }
3277  for (const auto& shared_dict_def : shared_dict_defs) {
3278  // check if the current column is a referencing column
3279  const auto& column = shared_dict_def.get_column();
3280  if (cd.columnName == column) {
3281  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3282  // Dictionaries are being shared in table to be created
3283  const auto& ref_column = shared_dict_def.get_foreign_column();
3284  auto colIt =
3285  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3286  return !ref_column.compare(it.columnName);
3287  });
3288  CHECK(colIt != cdd.end());
3289  cd.columnType = colIt->columnType;
3290 
3291  const int dict_id = colIt->columnType.get_comp_param();
3292  CHECK_GE(dict_id, 1);
3293  auto dictIt = std::find_if(
3294  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3295  return it.dictRef.dbId == this->currentDB_.dbId &&
3296  it.dictRef.dictId == dict_id;
3297  });
3298  if (dictIt != dds.end()) {
3299  // There exists dictionary definition of a dictionary column
3300  CHECK_GE(dictIt->refcount, 1);
3301  ++dictIt->refcount;
3302  if (!table_is_temporary(&td)) {
3303  // Persist reference count
3304  sqliteConnector_.query_with_text_params(
3305  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3306  {std::to_string(dict_id)});
3307  }
3308  } else {
3309  // The dictionary is referencing a column which is referencing a column in
3310  // diffrent table
3311  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3312  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3313  }
3314  } else {
3315  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3316  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3317  if (table_is_temporary(foreign_td)) {
3318  if (!table_is_temporary(&td)) {
3319  throw std::runtime_error(
3320  "Only temporary tables can share dictionaries with other temporary "
3321  "tables.");
3322  }
3323  addReferenceToForeignDict(cd, shared_dict_def, false);
3324  } else {
3325  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3326  }
3327  }
3328  return true;
3329  }
3330  }
3331  return false;
3332 }
3333 
3334 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3335  std::list<DictDescriptor>& dds,
3336  const TableDescriptor& td,
3337  const bool isLogicalTable) {
3338  cat_write_lock write_lock(this);
3339 
3340  std::string dictName{"Initial_key"};
3341  int dictId{0};
3342  std::string folderPath;
3343  if (isLogicalTable) {
3344  cat_sqlite_lock sqlite_lock(getObjForLock());
3345 
3346  sqliteConnector_.query_with_text_params(
3347  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3348  "?, 1)",
3349  std::vector<std::string>{
3350  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3351  sqliteConnector_.query_with_text_param(
3352  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3353  dictId = sqliteConnector_.getData<int>(0, 0);
3354  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3355  sqliteConnector_.query_with_text_param(
3356  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3357  folderPath = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
3358  "_DICT_" + std::to_string(dictId);
3359  }
3360  DictDescriptor dd(currentDB_.dbId,
3361  dictId,
3362  dictName,
3364  false,
3365  1,
3366  folderPath,
3367  false);
3368  dds.push_back(dd);
3369  if (!cd.columnType.is_array()) {
3371  }
3372  cd.columnType.set_comp_param(dictId);
3373 }
3374 
3375 void Catalog::createShardedTable(
3376  TableDescriptor& td,
3377  const list<ColumnDescriptor>& cols,
3378  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3379  cat_write_lock write_lock(this);
3380 
3381  /* create logical table */
3382  TableDescriptor* tdl = &td;
3383  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3384  int32_t logical_tb_id = tdl->tableId;
3385  std::string logical_table_name = tdl->tableName;
3386 
3387  /* create physical tables and link them to the logical table */
3388  std::vector<int32_t> physicalTables;
3389  for (int32_t i = 1; i <= td.nShards; i++) {
3390  TableDescriptor* tdp = &td;
3391  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3392  tdp->shard = i - 1;
3393  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3394  int32_t physical_tb_id = tdp->tableId;
3395 
3396  /* add physical table to the vector of physical tables */
3397  physicalTables.push_back(physical_tb_id);
3398  }
3399 
3400  if (!physicalTables.empty()) {
3401  /* add logical to physical tables correspondence to the map */
3402  const auto it_ok =
3403  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3404  CHECK(it_ok.second);
3405  /* update sqlite mapd_logical_to_physical in sqlite database */
3406  if (!table_is_temporary(&td)) {
3407  updateLogicalToPhysicalTableMap(logical_tb_id);
3408  }
3409  }
3410 }
3411 
3412 void Catalog::truncateTable(const TableDescriptor* td) {
3413  cat_write_lock write_lock(this);
3414 
3415  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3416  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3417  // truncate all corresponding physical tables if this is a logical table
3418  const auto physicalTables = physicalTableIt->second;
3419  CHECK(!physicalTables.empty());
3420  for (size_t i = 0; i < physicalTables.size(); i++) {
3421  int32_t physical_tb_id = physicalTables[i];
3422  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3423  CHECK(phys_td);
3424  doTruncateTable(phys_td);
3425  }
3426  }
3427  doTruncateTable(td);
3428 }
3429 
3430 void Catalog::doTruncateTable(const TableDescriptor* td) {
3431  cat_write_lock write_lock(this);
3432 
3433  const int tableId = td->tableId;
3434  // must destroy fragmenter before deleteChunks is called.
3435  if (td->fragmenter != nullptr) {
3436  auto tableDescIt = tableDescriptorMapById_.find(tableId);
3437  CHECK(tableDescIt != tableDescriptorMapById_.end());
3438  tableDescIt->second->fragmenter = nullptr;
3439  CHECK(td->fragmenter == nullptr);
3440  }
3441  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3442  // assuming deleteChunksWithPrefix is atomic
3443  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3444  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3445 
3446  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3447 
3448  std::unique_ptr<StringDictionaryClient> client;
3449  if (SysCatalog::instance().isAggregator()) {
3450  CHECK(!string_dict_hosts_.empty());
3451  DictRef dict_ref(currentDB_.dbId, -1);
3452  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3453  }
3454  // clean up any dictionaries
3455  // delete all column descriptors for the table
3456  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3457  auto cd = columnDescriptor.second;
3458  if (cd->tableId != td->tableId) {
3459  continue;
3460  }
3461  const int dict_id = cd->columnType.get_comp_param();
3462  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3463  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3464  const DictRef dict_ref(currentDB_.dbId, dict_id);
3465  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3466  CHECK(dictIt != dictDescriptorMapByRef_.end());
3467  const auto& dd = dictIt->second;
3468  CHECK_GE(dd->refcount, 1);
3469  // if this is the only table using this dict reset the dict
3470  if (dd->refcount == 1) {
3471  // close the dictionary
3472  dd->stringDict.reset();
3473  File_Namespace::renameForDelete(dd->dictFolderPath);
3474  if (client) {
3475  client->drop(dd->dictRef);
3476  }
3477  if (!dd->dictIsTemp) {
3478  boost::filesystem::create_directory(dd->dictFolderPath);
3479  }
3480  }
3481 
3482  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3483  dd->dictName,
3484  dd->dictNBits,
3485  dd->dictIsShared,
3486  dd->refcount,
3487  dd->dictFolderPath,
3488  dd->dictIsTemp);
3489  dictDescriptorMapByRef_.erase(dictIt);
3490  // now create new Dict -- need to figure out what to do here for temp tables
3491  if (client) {
3492  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3493  }
3494  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3495  getMetadataForDict(new_dd->dictRef.dictId);
3496  }
3497  }
3498 }
3499 
3500 void Catalog::removeFragmenterForTable(const int table_id) const {
3501  cat_write_lock write_lock(this);
3502  auto td = getMetadataForTable(table_id, false);
3503  if (td->fragmenter != nullptr) {
3504  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3505  CHECK(tableDescIt != tableDescriptorMapById_.end());
3506  tableDescIt->second->fragmenter = nullptr;
3507  CHECK(td->fragmenter == nullptr);
3508  }
3509 }
3510 
3511 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3512 void Catalog::removeChunksUnlocked(const int table_id) const {
3513  auto td = getMetadataForTable(table_id);
3514  CHECK(td);
3515 
3516  if (td->fragmenter != nullptr) {
3517  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3518  CHECK(tableDescIt != tableDescriptorMapById_.end());
3519  tableDescIt->second->fragmenter = nullptr;
3520  CHECK(td->fragmenter == nullptr);
3521  }
3522 
3523  // remove the chunks from in memory structures
3524  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3525 
3526  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3527  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3528 }
3529 
3530 void Catalog::dropTable(const TableDescriptor* td) {
3531  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
3533  cat_write_lock write_lock(this);
3534  cat_sqlite_lock sqlite_lock(getObjForLock());
3535  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3536  sqliteConnector_.query("BEGIN TRANSACTION");
3537  try {
3538  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3539  // remove all corresponding physical tables if this is a logical table
3540  const auto physicalTables = physicalTableIt->second;
3541  CHECK(!physicalTables.empty());
3542  for (size_t i = 0; i < physicalTables.size(); i++) {
3543  int32_t physical_tb_id = physicalTables[i];
3544  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3545  CHECK(phys_td);
3546  doDropTable(phys_td);
3547  }
3548 
3549  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
3550  sqliteConnector_.query_with_text_param(
3551  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
3552  std::to_string(td->tableId));
3553  logicalToPhysicalTableMapById_.erase(td->tableId);
3554  }
3555  doDropTable(td);
3556  } catch (std::exception& e) {
3557  sqliteConnector_.query("ROLLBACK TRANSACTION");
3558  throw;
3559  }
3560  sqliteConnector_.query("END TRANSACTION");
3561 }
3562 
3563 void Catalog::doDropTable(const TableDescriptor* td) {
3564  executeDropTableSqliteQueries(td);
3566  dropTableFromJsonUnlocked(td->tableName);
3567  }
3568  eraseTablePhysicalData(td);
3569 }
3570 
3571 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
3572  const int tableId = td->tableId;
3573  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
3574  std::to_string(tableId));
3575  sqliteConnector_.query_with_text_params(
3576  "select comp_param from mapd_columns where compression = ? and tableid = ?",
3577  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3578  int numRows = sqliteConnector_.getNumRows();
3579  std::vector<int> dict_id_list;
3580  for (int r = 0; r < numRows; ++r) {
3581  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
3582  }
3583  for (auto dict_id : dict_id_list) {
3584  sqliteConnector_.query_with_text_params(
3585  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
3586  std::vector<std::string>{std::to_string(dict_id)});
3587  }
3588  sqliteConnector_.query_with_text_params(
3589  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
3590  "mapd_columns where compression = ? "
3591  "and tableid = ?) and refcount = 0",
3592  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3593  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
3594  std::to_string(tableId));
3595  if (td->isView) {
3596  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
3597  std::to_string(tableId));
3598  }
3600  sqliteConnector_.query_with_text_param(
3601  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
3602  }
3603 }
3604 
3605 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
3606  cat_write_lock write_lock(this);
3607  cat_sqlite_lock sqlite_lock(getObjForLock());
3608 
3609  sqliteConnector_.query("BEGIN TRANSACTION");
3610  try {
3611  sqliteConnector_.query_with_text_params(
3612  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3613  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
3614  } catch (std::exception& e) {
3615  sqliteConnector_.query("ROLLBACK TRANSACTION");
3616  throw;
3617  }
3618  sqliteConnector_.query("END TRANSACTION");
3619  TableDescriptorMap::iterator tableDescIt =
3620  tableDescriptorMap_.find(to_upper(td->tableName));
3621  CHECK(tableDescIt != tableDescriptorMap_.end());
3622  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3623  // Get table descriptor to change it
3624  TableDescriptor* changeTd = tableDescIt->second;
3625  changeTd->tableName = newTableName;
3626  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3627  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3628  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3629 }
3630 
3631 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
3632  {
3633  cat_write_lock write_lock(this);
3634  cat_sqlite_lock sqlite_lock(getObjForLock());
3635  // rename all corresponding physical tables if this is a logical table
3636  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3637  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3638  const auto physicalTables = physicalTableIt->second;
3639  CHECK(!physicalTables.empty());
3640  for (size_t i = 0; i < physicalTables.size(); i++) {
3641  int32_t physical_tb_id = physicalTables[i];
3642  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3643  CHECK(phys_td);
3644  std::string newPhysTableName =
3645  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
3646  renamePhysicalTable(phys_td, newPhysTableName);
3647  }
3648  }
3649  renamePhysicalTable(td, newTableName);
3650  }
3651  {
3652  DBObject object(newTableName, TableDBObjectType);
3653  // update table name in direct and effective priv map
3654  DBObjectKey key;
3655  key.dbId = currentDB_.dbId;
3656  key.objectId = td->tableId;
3657  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3658  object.setObjectKey(key);
3659  auto objdescs = SysCatalog::instance().getMetadataForObject(
3660  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
3661  for (auto obj : objdescs) {
3662  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3663  if (grnt) {
3664  grnt->renameDbObject(object);
3665  }
3666  }
3667  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3668  }
3669 }
3670 
3671 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
3672  std::vector<int>& tableIds) {
3673  cat_write_lock write_lock(this);
3674  cat_sqlite_lock sqlite_lock(getObjForLock());
3675 
3676  // execute the SQL query
3677  try {
3678  for (size_t i = 0; i < names.size(); i++) {
3679  int tableId = tableIds[i];
3680  std::string& newTableName = names[i].second;
3681 
3682  sqliteConnector_.query_with_text_params(
3683  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3684  std::vector<std::string>{newTableName, std::to_string(tableId)});
3685  }
3686  } catch (std::exception& e) {
3687  sqliteConnector_.query("ROLLBACK TRANSACTION");
3688  throw;
3689  }
3690 
3691  // reset the table descriptors, give Calcite a kick
3692  for (size_t i = 0; i < names.size(); i++) {
3693  std::string& curTableName = names[i].first;
3694  std::string& newTableName = names[i].second;
3695 
3696  TableDescriptorMap::iterator tableDescIt =
3697  tableDescriptorMap_.find(to_upper(curTableName));
3698  CHECK(tableDescIt != tableDescriptorMap_.end());
3699  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3700 
3701  // Get table descriptor to change it
3702  TableDescriptor* changeTd = tableDescIt->second;
3703  changeTd->tableName = newTableName;
3704  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3705  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3706  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3707  }
3708 }
3709 
3710 // Collect an 'overlay' mapping of the tableNames->tableId
3711 // to account for possible chained renames
3712 // (for swap: a->b, b->c, c->d, d->a)
3713 
3715  std::map<std::string, int>& cachedTableMap,
3716  std::string& curTableName) {
3717  auto iter = cachedTableMap.find(curTableName);
3718  if ((iter != cachedTableMap.end())) {
3719  // get the cached tableId
3720  // and use that to lookup the TableDescriptor
3721  int tableId = (*iter).second;
3722  if (tableId == -1)
3723  return NULL;
3724  else
3725  return cat->getMetadataForTable(tableId);
3726  }
3727 
3728  // else ... lookup in standard location
3729  return cat->getMetadataForTable(curTableName);
3730 }
3731 
3732 void replaceTableName(std::map<std::string, int>& cachedTableMap,
3733  std::string& curTableName,
3734  std::string& newTableName,
3735  int tableId) {
3736  // mark old/cur name as deleted
3737  cachedTableMap[curTableName] = -1;
3738 
3739  // insert the 'new' name
3740  cachedTableMap[newTableName] = tableId;
3741 }
3742 
3743 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
3744  // tableId of all tables being renamed
3745  // ... in matching order to 'names'
3746  std::vector<int> tableIds;
3747 
3748  // (sorted & unique) list of tables ids for locking
3749  // (with names index of src in case of error)
3750  // <tableId, strIndex>
3751  // std::map is by definition/implementation sorted
3752  // std::map current usage below tests to avoid over-write
3753  std::map<int, size_t> uniqueOrderedTableIds;
3754 
3755  // mapping of modified tables names -> tableId
3756  std::map<std::string, int> cachedTableMap;
3757 
3758  // -------- Setup --------
3759 
3760  // gather tableIds pre-execute; build maps
3761  for (size_t i = 0; i < names.size(); i++) {
3762  std::string& curTableName = names[i].first;
3763  std::string& newTableName = names[i].second;
3764 
3765  // make sure the table being renamed exists,
3766  // or will exist when executed in 'name' order
3767  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
3768  CHECK(td);
3769 
3770  tableIds.push_back(td->tableId);
3771  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
3772  // don't overwrite as it should map to the first names index 'i'
3773  uniqueOrderedTableIds[td->tableId] = i;
3774  }
3775  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
3776  }
3777 
3778  CHECK_EQ(tableIds.size(), names.size());
3779 
3780  // The outer Stmt created a write lock before calling the catalog rename table
3781  // -> TODO: might want to sort out which really should set the lock :
3782  // the comment in the outer scope indicates it should be in here
3783  // but it's not clear if the access done there *requires* it out there
3784  //
3785  // Lock tables pre-execute (may/will be in different order than rename occurs)
3786  // const auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
3787  // *legacylockmgr::LockMgr<mapd_shared_mutex, bool>::getMutex(
3788  // legacylockmgr::ExecutorOuterLock, true));
3789 
3790  // acquire the locks for all tables being renamed
3792  for (auto& idPair : uniqueOrderedTableIds) {
3793  std::string& tableName = names[idPair.second].first;
3794  tableLocks.emplace_back(
3797  *this, tableName, false)));
3798  }
3799 
3800  // -------- Rename --------
3801 
3802  {
3803  cat_write_lock write_lock(this);
3804  cat_sqlite_lock sqlite_lock(getObjForLock());
3805 
3806  sqliteConnector_.query("BEGIN TRANSACTION");
3807 
3808  // collect all (tables + physical tables) into a single list
3809  std::vector<std::pair<std::string, std::string>> allNames;
3810  std::vector<int> allTableIds;
3811 
3812  for (size_t i = 0; i < names.size(); i++) {
3813  int tableId = tableIds[i];
3814  std::string& curTableName = names[i].first;
3815  std::string& newTableName = names[i].second;
3816 
3817  // rename all corresponding physical tables if this is a logical table
3818  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
3819  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3820  const auto physicalTables = physicalTableIt->second;
3821  CHECK(!physicalTables.empty());
3822  for (size_t k = 0; k < physicalTables.size(); k++) {
3823  int32_t physical_tb_id = physicalTables[k];
3824  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3825  CHECK(phys_td);
3826  std::string newPhysTableName =
3827  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
3828  allNames.push_back(
3829  std::pair<std::string, std::string>(phys_td->tableName, newPhysTableName));
3830  allTableIds.push_back(phys_td->tableId);
3831  }
3832  }
3833  allNames.push_back(std::pair<std::string, std::string>(curTableName, newTableName));
3834  allTableIds.push_back(tableId);
3835  }
3836  // rename all tables in one shot
3837  renamePhysicalTable(allNames, allTableIds);
3838 
3839  sqliteConnector_.query("END TRANSACTION");
3840  // cat write/sqlite locks are released when they go out scope
3841  }
3842  {
3843  // now update the SysCatalog
3844  for (size_t i = 0; i < names.size(); i++) {
3845  int tableId = tableIds[i];
3846  std::string& newTableName = names[i].second;
3847  {
3848  // update table name in direct and effective priv map
3849  DBObjectKey key;
3850  key.dbId = currentDB_.dbId;
3851  key.objectId = tableId;
3852  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3853 
3854  DBObject object(newTableName, TableDBObjectType);
3855  object.setObjectKey(key);
3856 
3857  auto objdescs = SysCatalog::instance().getMetadataForObject(
3858  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
3859  for (auto obj : objdescs) {
3860  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3861  if (grnt) {
3862  grnt->renameDbObject(object);
3863  }
3864  }
3865  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
3866  }
3867  }
3868  }
3869 
3870  // -------- Cleanup --------
3871 
3872  // table locks are released when 'tableLocks' goes out of scope
3873 }
3874 
3875 void Catalog::renameColumn(const TableDescriptor* td,
3876  const ColumnDescriptor* cd,
3877  const string& newColumnName) {
3878  cat_write_lock write_lock(this);
3879  cat_sqlite_lock sqlite_lock(getObjForLock());
3880  sqliteConnector_.query("BEGIN TRANSACTION");
3881  try {
3882  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3883  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3884  CHECK(cdx);
3885  std::string new_column_name = cdx->columnName;
3886  new_column_name.replace(0, cd->columnName.size(), newColumnName);
3887  sqliteConnector_.query_with_text_params(
3888  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
3889  std::vector<std::string>{new_column_name,
3890  std::to_string(td->tableId),
3891  std::to_string(cdx->columnId)});
3892  }
3893  } catch (std::exception& e) {
3894  sqliteConnector_.query("ROLLBACK TRANSACTION");
3895  throw;
3896  }
3897  sqliteConnector_.query("END TRANSACTION");
3898  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3899  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3900  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3901  CHECK(cdx);
3902  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
3903  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
3904  CHECK(columnDescIt != columnDescriptorMap_.end());
3905  ColumnDescriptor* changeCd = columnDescIt->second;
3906  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
3907  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
3908  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
3909  changeCd;
3910  }
3911  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3912 }
3913 
3914 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
3915  cat_write_lock write_lock(this);
3916  cat_sqlite_lock sqlite_lock(getObjForLock());
3917  sqliteConnector_.query("BEGIN TRANSACTION");
3918  try {
3919  // TODO(andrew): this should be an upsert
3920  sqliteConnector_.query_with_text_params(
3921  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
3922  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3923  if (sqliteConnector_.getNumRows() > 0) {
3924  sqliteConnector_.query_with_text_params(
3925  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
3926  "update_time = "
3927  "datetime('now') where name = ? "
3928  "and userid = ?",
3929  std::vector<std::string>{vd.dashboardState,
3930  vd.imageHash,
3931  vd.dashboardMetadata,
3932  vd.dashboardName,
3933  std::to_string(vd.userId)});
3934  } else {
3935  sqliteConnector_.query_with_text_params(
3936  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
3937  "update_time, "
3938  "userid) "
3939  "VALUES "
3940  "(?,?,?,?, "
3941  "datetime('now'), ?)",
3942  std::vector<std::string>{vd.dashboardName,
3943  vd.dashboardState,
3944  vd.imageHash,
3945  vd.dashboardMetadata,
3946  std::to_string(vd.userId)});
3947  }
3948  } catch (std::exception& e) {
3949  sqliteConnector_.query("ROLLBACK TRANSACTION");
3950  throw;
3951  }
3952  sqliteConnector_.query("END TRANSACTION");
3953 
3954  // now get the auto generated dashboardId
3955  try {
3956  sqliteConnector_.query_with_text_params(
3957  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
3958  "WHERE name = ? and userid = ?",
3959  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3960  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
3961  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
3962  } catch (std::exception& e) {
3963  throw;
3964  }
3966  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
3967  addFrontendViewToMap(vd);
3968  sqlite_lock.unlock();
3969  write_lock.unlock();
3970  // NOTE(wamsi): Transactionally unsafe
3971  createOrUpdateDashboardSystemRole(
3973  return vd.dashboardId;
3974 }
3975 
3976 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
3977  cat_write_lock write_lock(this);
3978  cat_sqlite_lock sqlite_lock(getObjForLock());
3979 
3980  CHECK(sqliteConnector_.getSqlitePtr());
3981  sqliteConnector_.query("BEGIN TRANSACTION");
3982  try {
3983  sqliteConnector_.query_with_text_params(
3984  "SELECT id FROM mapd_dashboards WHERE id = ?",
3985  std::vector<std::string>{std::to_string(vd.dashboardId)});
3986  if (sqliteConnector_.getNumRows() > 0) {
3987  sqliteConnector_.query_with_text_params(
3988  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
3989  "?, userid = ?, update_time = datetime('now') where id = ? ",
3990  std::vector<std::string>{vd.dashboardName,
3991  vd.dashboardState,
3992  vd.imageHash,
3993  vd.dashboardMetadata,
3994  std::to_string(vd.userId),
3996  } else {
3997  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
3998  << " does not exist in db";
3999  throw runtime_error("Error replacing dashboard id " +
4000  std::to_string(vd.dashboardId) + " does not exist in db");
4001  }
4002  } catch (std::exception& e) {
4003  sqliteConnector_.query("ROLLBACK TRANSACTION");
4004  throw;
4005  }
4006  sqliteConnector_.query("END TRANSACTION");
4007 
4008  bool found{false};
4009  for (auto descp : dashboardDescriptorMap_) {
4010  auto dash = descp.second.get();
4011  if (dash->dashboardId == vd.dashboardId) {
4012  found = true;
4013  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4014  dash->dashboardName);
4015  if (viewDescIt ==
4016  dashboardDescriptorMap_.end()) { // check to make sure view exists
4017  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4018  << " dashboard " << dash->dashboardName << " does not exist in map";
4019  throw runtime_error("No metadata for dashboard for user " +
4020  std::to_string(dash->userId) + " dashboard " +
4021  dash->dashboardName + " does not exist in map");
4022  }
4023  dashboardDescriptorMap_.erase(viewDescIt);
4024  break;
4025  }
4026  }
4027  if (!found) {
4028  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4029  << " does not exist in map";
4030  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4031  " does not exist in map");
4032  }
4033 
4034  // now reload the object
4035  sqliteConnector_.query_with_text_params(
4036  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4037  "mapd_dashboards "
4038  "WHERE id = ?",
4039  std::vector<std::string>{std::to_string(vd.dashboardId)});
4040  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4042  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4043  addFrontendViewToMapNoLock(vd);
4044  sqlite_lock.unlock();
4045  write_lock.unlock();
4046  // NOTE(wamsi): Transactionally unsafe
4047  createOrUpdateDashboardSystemRole(
4049 }
4050 
4051 std::string Catalog::calculateSHA1(const std::string& data) {
4052  boost::uuids::detail::sha1 sha1;
4053  unsigned int digest[5];
4054  sha1.process_bytes(data.c_str(), data.length());
4055  sha1.get_digest(digest);
4056  std::stringstream ss;
4057  for (size_t i = 0; i < 5; i++) {
4058  ss << std::hex << digest[i];
4059  }
4060  return ss.str();
4061 }
4062 
4063 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4064  cat_write_lock write_lock(this);
4065  cat_sqlite_lock sqlite_lock(getObjForLock());
4066  sqliteConnector_.query("BEGIN TRANSACTION");
4067  try {
4068  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4069  .substr(0, 8);
4070  sqliteConnector_.query_with_text_params(
4071  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4072  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4073  if (sqliteConnector_.getNumRows() > 0) {
4074  sqliteConnector_.query_with_text_params(
4075  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4076  "link = ?",
4077  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4078  } else {
4079  sqliteConnector_.query_with_text_params(
4080  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4081  "update_time) VALUES (?,?,?,?, datetime('now'))",
4082  std::vector<std::string>{
4083  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4084  }
4085  // now get the auto generated dashid
4086  sqliteConnector_.query_with_text_param(
4087  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4088  "WHERE link = ?",
4089  ld.link);
4090  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4091  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4092  } catch (std::exception& e) {
4093  sqliteConnector_.query("ROLLBACK TRANSACTION");
4094  throw;
4095  }
4096  sqliteConnector_.query("END TRANSACTION");
4097  addLinkToMap(ld);
4098  return ld.link;
4099 }
4100 
4101 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4102  const TableDescriptor* td) const {
4103  cat_read_lock read_lock(this);
4104 
4105  const auto column_descriptors =
4106  getAllColumnMetadataForTable(td->tableId, false, true, true);
4107 
4108  const ColumnDescriptor* shard_cd{nullptr};
4109  int i = 1;
4110  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4111  ++cd_itr, ++i) {
4112  if (i == td->shardedColumnId) {
4113  shard_cd = *cd_itr;
4114  }
4115  }
4116  return shard_cd;
4117 }
4118 
4119 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4120  const TableDescriptor* logical_table_desc,
4121  bool populate_fragmenter) const {
4122  cat_read_lock read_lock(this);
4123  const auto physicalTableIt =
4124  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4125  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4126  return {logical_table_desc};
4127  }
4128 
4129  const auto physicalTablesIds = physicalTableIt->second;
4130  CHECK(!physicalTablesIds.empty());
4131  std::vector<const TableDescriptor*> physicalTables;
4132  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4133  physicalTables.push_back(
4134  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4135  }
4136 
4137  return physicalTables;
4138 }
4139 
4140 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4141  cat_read_lock read_lock(this);
4142 
4143  std::map<int, const ColumnDescriptor*> mapping;
4144 
4145  const auto tables = getAllTableMetadata();
4146  for (const auto td : tables) {
4147  if (td->shard >= 0) {
4148  // skip shards, they're not standalone tables
4149  continue;
4150  }
4151 
4152  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4153  const auto& ti = cd->columnType;
4154  if (ti.is_string()) {
4155  if (ti.get_compression() == kENCODING_DICT) {
4156  // if foreign reference, get referenced tab.col
4157  const auto dict_id = ti.get_comp_param();
4158 
4159  // ignore temp (negative) dictionaries
4160  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4161  mapping[dict_id] = cd;
4162  }
4163  }
4164  }
4165  }
4166  }
4167 
4168  return mapping;
4169 }
4170 
4171 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4172  const UserMetadata& user_metadata,
4173  const GetTablesType get_tables_type) const {
4174  if (td->shard >= 0) {
4175  // skip shards, they're not standalone tables
4176  return false;
4177  }
4178  switch (get_tables_type) {
4179  case GET_PHYSICAL_TABLES: {
4180  if (td->isView) {
4181  return false;
4182  }
4183  break;
4184  }
4185  case GET_VIEWS: {
4186  if (!td->isView) {
4187  return false;
4188  }
4189  break;
4190  }
4191  default:
4192  break;
4193  }
4195  dbObject.loadKey(*this);
4196  std::vector<DBObject> privObjects = {dbObject};
4197  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4198  // skip table, as there are no privileges to access it
4199  return false;
4200  }
4201  return true;
4202 }
4203 
4204 std::vector<std::string> Catalog::getTableNamesForUser(
4205  const UserMetadata& user_metadata,
4206  const GetTablesType get_tables_type) const {
4207  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4208  cat_read_lock read_lock(this);
4209 
4210  std::vector<std::string> table_names;
4211  const auto tables = getAllTableMetadata();
4212  for (const auto td : tables) {
4213  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4214  table_names.push_back(td->tableName);
4215  }
4216  }
4217  return table_names;
4218 }
4219 
4220 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4221  const UserMetadata& user_metadata,
4222  const GetTablesType get_tables_type,
4223  const std::string& filter_table_name) const {
4224  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4225  cat_read_lock read_lock(this);
4226 
4227  std::vector<TableMetadata> tables_metadata;
4228  const auto tables = getAllTableMetadata();
4229  for (const auto td : tables) {
4230  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4231  if (!filter_table_name.empty()) {
4232  if (td->tableName != filter_table_name) {
4233  continue;
4234  }
4235  }
4236  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4237  // descriptor outside catalog lock
4238  tables_metadata.emplace_back(table_metadata);
4239  }
4240  }
4241  return tables_metadata;
4242 }
4243 
4244 int Catalog::getLogicalTableId(const int physicalTableId) const {
4245  cat_read_lock read_lock(this);
4246  for (const auto& l : logicalToPhysicalTableMapById_) {
4247  if (l.second.end() != std::find_if(l.second.begin(),
4248  l.second.end(),
4249  [&](decltype(*l.second.begin()) tid) -> bool {
4250  return physicalTableId == tid;
4251  })) {
4252  return l.first;
4253  }
4254  }
4255  return physicalTableId;
4256 }
4257 
4258 void Catalog::checkpoint(const int logicalTableId) const {
4259  const auto td = getMetadataForTable(logicalTableId);
4260  const auto shards = getPhysicalTablesDescriptors(td);
4261  for (const auto shard : shards) {
4262  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4263  }
4264 }
4265 
4266 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4267  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4268  try {
4269  checkpoint(logical_table_id);
4270  } catch (...) {
4271  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4272  throw;
4273  }
4274 }
4275 
4276 void Catalog::eraseDBData() {
4277  cat_write_lock write_lock(this);
4278  // Physically erase all tables and dictionaries from disc and memory
4279  const auto tables = getAllTableMetadata();
4280  for (const auto table : tables) {
4281  eraseTablePhysicalData(table);
4282  }
4283  // Physically erase database metadata
4284  boost::filesystem::remove(basePath_ + "/mapd_catalogs/" + currentDB_.dbName);
4285  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4286 }
4287 
4288 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4289  const int tableId = td->tableId;
4290  // must destroy fragmenter before deleteChunks is called.
4291  if (td->fragmenter != nullptr) {
4292  auto tableDescIt = tableDescriptorMapById_.find(tableId);
4293  CHECK(tableDescIt != tableDescriptorMapById_.end());
4294  {
4295  INJECT_TIMER(deleting_fragmenter);
4296  tableDescIt->second->fragmenter = nullptr;
4297  }
4298  }
4299  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4300  {
4301  INJECT_TIMER(deleteChunksWithPrefix);
4302  // assuming deleteChunksWithPrefix is atomic
4303  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4304  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4305  }
4306  if (!td->isView) {
4307  INJECT_TIMER(Remove_Table);
4308  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4309  }
4310  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4311  {
4312  INJECT_TIMER(removeTableFromMap_);
4313  removeTableFromMap(td->tableName, tableId);
4314  }
4315 }
4316 
4317 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4318  const int32_t& shardNumber) {
4319  std::string physicalTableName =
4320  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4321  return (physicalTableName);
4322 }
4323 
4324 void Catalog::buildForeignServerMap() {
4325  sqliteConnector_.query(
4326  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4327  "omnisci_foreign_servers");
4328  auto num_rows = sqliteConnector_.getNumRows();
4329  for (size_t row = 0; row < num_rows; row++) {
4330  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4331  sqliteConnector_.getData<int>(row, 0),
4332  sqliteConnector_.getData<std::string>(row, 1),
4333  sqliteConnector_.getData<std::string>(row, 2),
4334  sqliteConnector_.getData<std::string>(row, 3),
4335  sqliteConnector_.getData<std::int32_t>(row, 4),
4336  sqliteConnector_.getData<std::int32_t>(row, 5));
4337  foreignServerMap_[foreign_server->name] = foreign_server;
4338  foreignServerMapById_[foreign_server->id] = foreign_server;
4339  }
4340 }
4341 
4342 void Catalog::addForeignTableDetails() {
4343  sqliteConnector_.query(
4344  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4345  "omnisci_foreign_tables");
4346  auto num_rows = sqliteConnector_.getNumRows();
4347  for (size_t r = 0; r < num_rows; r++) {
4348  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4349  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4350  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4351  const auto last_refresh_time = sqliteConnector_.getData<int>(r, 3);
4352  const auto next_refresh_time = sqliteConnector_.getData<int>(r, 4);
4353 
4354  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4355  auto foreign_table =
4356  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4357  CHECK(foreign_table);
4358  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4359  CHECK(foreign_table->foreign_server);
4360  foreign_table->populateOptionsMap(options);
4361  foreign_table->last_refresh_time = last_refresh_time;
4362  foreign_table->next_refresh_time = next_refresh_time;
4363  }
4364 }
4365 
4366 void Catalog::setForeignServerProperty(const std::string& server_name,
4367  const std::string& property,
4368  const std::string& value) {
4369  cat_sqlite_lock sqlite_lock(getObjForLock());
4370  sqliteConnector_.query_with_text_params(
4371  "SELECT id from omnisci_foreign_servers where name = ?",
4372  std::vector<std::string>{server_name});
4373  auto num_rows = sqliteConnector_.getNumRows();
4374  if (num_rows > 0) {
4375  CHECK_EQ(size_t(1), num_rows);
4376  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4377  sqliteConnector_.query_with_text_params(
4378  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4379  std::vector<std::string>{value, std::to_string(server_id)});
4380  } else {
4381  throw std::runtime_error{"Can not change property \"" + property +
4382  "\" for foreign server." + " Foreign server \"" +
4383  server_name + "\" is not found."};
4384  }
4385 }
4386 
4387 void Catalog::createDefaultServersIfNotExists() {
4391 
4392  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4393  "omnisci_local_csv",
4395  options,
4397  local_csv_server->validate();
4398  createForeignServerNoLocks(std::move(local_csv_server), true);
4399 
4400  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4401  "omnisci_local_parquet",
4403  options,
4405  local_parquet_server->validate();
4406  createForeignServerNoLocks(std::move(local_parquet_server), true);
4407 }
4408 
4409 // prepare a fresh file reload on next table access
4410 void Catalog::setForReload(const int32_t tableId) {
4411  cat_read_lock read_lock(this);
4412  const auto td = getMetadataForTable(tableId);
4413  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4414  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4415  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4416  }
4417 }
4418 
4419 // get a table's data dirs
4420 std::vector<std::string> Catalog::getTableDataDirectories(
4421  const TableDescriptor* td) const {
4422  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4423  std::vector<std::string> file_paths;
4424  for (auto shard : getPhysicalTablesDescriptors(td)) {
4425  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4426  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4427  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4428  file_paths.push_back(file_path.filename().string());
4429  }
4430  return file_paths;
4431 }
4432 
4433 // get a column's dict dir basename
4434 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd) const {
4435  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4437  cd->columnType.get_comp_param() > 0) {
4438  const auto dictId = cd->columnType.get_comp_param();
4439  const DictRef dictRef(currentDB_.dbId, dictId);
4440  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4441  CHECK(dit != dictDescriptorMapByRef_.end());
4442  CHECK(dit->second);
4443  boost::filesystem::path file_path(dit->second->dictFolderPath);
4444  return file_path.filename().string();
4445  }
4446  return std::string();
4447 }
4448 
4449 // get a table's dict dirs
4450 std::vector<std::string> Catalog::getTableDictDirectories(
4451  const TableDescriptor* td) const {
4452  std::vector<std::string> file_paths;
4453  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4454  auto file_base = getColumnDictDirectory(cd);
4455  if (!file_base.empty() &&
4456  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4457  file_paths.push_back(file_base);
4458  }
4459  }
4460  return file_paths;
4461 }
4462 
4463 // returns table schema in a string
4464 // NOTE(sy): Might be able to replace dumpSchema() later with
4465 // dumpCreateTable() after a deeper review of the TableArchiver code.
4466 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
4467  cat_read_lock read_lock(this);
4468 
4469  std::ostringstream os;
4470  os << "CREATE TABLE @T (";
4471  // gather column defines
4472  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4473  std::string comma;
4474  std::vector<std::string> shared_dicts;
4475  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4476  for (const auto cd : cds) {
4477  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4478  const auto& ti = cd->columnType;
4479  os << comma << cd->columnName;
4480  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4481  if (ti.get_type() == SQLTypes::kCHAR) {
4482  os << " "
4483  << "TEXT";
4484  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4485  os << " "
4486  << "TEXT[]";
4487  } else {
4488  os << " " << ti.get_type_name();
4489  }
4490  os << (ti.get_notnull() ? " NOT NULL" : "");
4491  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4492  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4493  if (ti.get_compression() == kENCODING_DICT) {
4494  // if foreign reference, get referenced tab.col
4495  const auto dict_id = ti.get_comp_param();
4496  const DictRef dict_ref(currentDB_.dbId, dict_id);
4497  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4498  CHECK(dict_it != dictDescriptorMapByRef_.end());
4499  const auto dict_name = dict_it->second->dictName;
4500  // when migrating a table, any foreign dict ref will be dropped
4501  // and the first cd of a dict will become root of the dict
4502  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
4503  dict_root_cds[dict_name] = cd;
4504  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4505  } else {
4506  const auto dict_root_cd = dict_root_cds[dict_name];
4507  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
4508  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
4509  // "... shouldn't specify an encoding, it borrows from the referenced
4510  // column"
4511  }
4512  } else {
4513  os << " ENCODING NONE";
4514  }
4515  } else if (ti.is_date_in_days() ||
4516  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4517  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4518  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4519  } else if (ti.is_geometry()) {
4520  if (ti.get_compression() == kENCODING_GEOINT) {
4521  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4522  << ")";
4523  } else {
4524  os << " ENCODING NONE";
4525  }
4526  }
4527  comma = ", ";
4528  }
4529  }
4530  // gather SHARED DICTIONARYs
4531  if (shared_dicts.size()) {
4532  os << ", " << boost::algorithm::join(shared_dicts, ", ");
4533  }
4534  // gather WITH options ...
4535  std::vector<std::string> with_options;
4536  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4537  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4538  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4539  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4540  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4541  : "VACUUM='IMMEDIATE'");
4542  if (!td->partitions.empty()) {
4543  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4544  }
4545  if (td->nShards > 0) {
4546  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4547  CHECK(shard_cd);
4548  os << ", SHARD KEY(" << shard_cd->columnName << ")";
4549  with_options.push_back(
4550  "SHARD_COUNT=" +
4551  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4552  }
4553  if (td->sortedColumnId > 0) {
4554  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4555  CHECK(sort_cd);
4556  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4557  }
4559  td->maxRollbackEpochs != -1) {
4560  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4562  }
4563  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
4564  return os.str();
4565 }
4566 
4567 #include "Parser/ReservedKeywords.h"
4568 
4570 inline bool contains_spaces(std::string_view str) {
4571  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
4572  return std::isspace(ch);
4573  }) != str.end();
4574 }
4575 
4578  std::string_view str,
4579  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
4580  return str.find_first_of(chars) != std::string_view::npos;
4581 }
4582 
4584 inline bool is_reserved_sql_keyword(std::string_view str) {
4585  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
4586 }
4587 
4588 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
4589 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
4590  bool multiline_formatting,
4591  bool dump_defaults) const {
4592  cat_read_lock read_lock(this);
4593 
4594  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
4595  std::ostringstream os;
4596 
4597  if (foreign_table) {
4598  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
4599  } else if (!td->isView) {
4600  os << "CREATE ";
4602  os << "TEMPORARY ";
4603  }
4604  os << "TABLE " + td->tableName + " (";
4605  } else {
4606  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
4607  return os.str();
4608  }
4609  // scan column defines
4610  std::vector<std::string> additional_info;
4611  std::set<std::string> shared_dict_column_names;
4612 
4613  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
4614 
4615  // gather column defines
4616  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4617  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4618  bool first = true;
4619  for (const auto cd : cds) {
4620  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4621  const auto& ti = cd->columnType;
4622  if (!first) {
4623  os << ",";
4624  if (!multiline_formatting) {
4625  os << " ";
4626  }
4627  } else {
4628  first = false;
4629  }
4630  if (multiline_formatting) {
4631  os << "\n ";
4632  }
4633  // column name
4634  os << quoteIfRequired(cd->columnName);
4635  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4636  if (ti.get_type() == SQLTypes::kCHAR) {
4637  os << " "
4638  << "TEXT";
4639  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4640  os << " "
4641  << "TEXT[]";
4642  } else {
4643  os << " " << ti.get_type_name();
4644  }
4645  os << (ti.get_notnull() ? " NOT NULL" : "");
4646  if (shared_dict_column_names.find(cd->columnName) ==
4647  shared_dict_column_names.end()) {
4648  // avoids "Column ... shouldn't specify an encoding, it borrows it
4649  // from the referenced column"
4650  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4651  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4652  if (ti.get_compression() == kENCODING_DICT) {
4653  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4654  } else {
4655  os << " ENCODING NONE";
4656  }
4657  } else if (ti.is_date_in_days() ||
4658  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4659  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4660  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4661  } else if (ti.is_geometry()) {
4662  if (ti.get_compression() == kENCODING_GEOINT) {
4663  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4664  << ")";
4665  } else {
4666  os << " ENCODING NONE";
4667  }
4668  }
4669  }
4670  }
4671  }
4672  // gather SHARED DICTIONARYs
4673  if (additional_info.size()) {
4674  std::string comma;
4675  if (!multiline_formatting) {
4676  comma = ", ";
4677  } else {
4678  comma = ",\n ";
4679  }
4680  os << comma;
4681  os << boost::algorithm::join(additional_info, comma);
4682  }
4683  os << ")";
4684 
4685  std::vector<std::string> with_options;
4686  if (foreign_table) {
4687  if (multiline_formatting) {
4688  os << "\n";
4689  } else {
4690  os << " ";
4691  }
4692  os << "SERVER " << foreign_table->foreign_server->name;
4693 
4694  // gather WITH options ...
4695  for (const auto& [option, value] : foreign_table->options) {
4696  with_options.emplace_back(option + "='" + value + "'");
4697  }
4698  }
4699 
4700  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
4701  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4702  }
4703  if (!foreign_table && (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE)) {
4704  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4705  }
4706  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
4707  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4708  }
4709  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
4710  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4711  }
4712  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
4713  td->maxRollbackEpochs != -1) {
4714  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4716  }
4717  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
4718  with_options.push_back(td->hasDeletedCol ? "VACUUM='DELAYED'" : "VACUUM='IMMEDIATE'");
4719  }
4720  if (!foreign_table && !td->partitions.empty()) {
4721  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4722  }
4723  if (!foreign_table && td->nShards > 0) {
4724  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4725  CHECK(shard_cd);
4726  with_options.push_back(
4727  "SHARD_COUNT=" +
4728  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4729  }
4730  if (!foreign_table && td->sortedColumnId > 0) {
4731  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4732  CHECK(sort_cd);
4733  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4734  }
4735 
4736  if (!with_options.empty()) {
4737  if (!multiline_formatting) {
4738  os << " ";
4739  } else {
4740  os << "\n";
4741  }
4742  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
4743  }
4744  os << ";";
4745  return os.str();
4746 }
4747 
4748 bool Catalog::validateNonExistentTableOrView(const std::string& name,
4749  const bool if_not_exists) {
4750  if (getMetadataForTable(name, false)) {
4751  if (if_not_exists) {
4752  return false;
4753  }
4754  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
4755  }
4756  return true;
4757 }
4758 
4759 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
4760  cat_read_lock read_lock(this);
4761  std::vector<const TableDescriptor*> tables;
4762  for (auto entry : tableDescriptorMapById_) {
4763  auto table_descriptor = entry.second;
4764  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
4765  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4766  CHECK(foreign_table);
4767  auto timing_type_entry = foreign_table->options.find(
4769  CHECK(timing_type_entry != foreign_table->options.end());
4770  int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
4771  std::chrono::system_clock::now().time_since_epoch())
4772  .count();
4773  if (timing_type_entry->second ==
4775  foreign_table->next_refresh_time <= current_time) {
4776  tables.emplace_back(foreign_table);
4777  }
4778  }
4779  }
4780  return tables;
4781 }
4782 
4783 void Catalog::updateForeignTableRefreshTimes(const int32_t table_id) {
4784  cat_write_lock write_lock(this);
4785  cat_sqlite_lock sqlite_lock(getObjForLock());
4786  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4787  auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
4788  CHECK(table_descriptor);
4789  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
4790  CHECK(foreign_table);
4791  int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
4792  std::chrono::system_clock::now().time_since_epoch())
4793  .count();
4794  auto last_refresh_time = current_time;
4795  auto next_refresh_time = get_next_refresh_time(*foreign_table);
4796  sqliteConnector_.query_with_text_params(
4797  "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
4798  "WHERE table_id = ?",
4799  std::vector<std::string>{std::to_string(last_refresh_time),
4800  std::to_string(next_refresh_time),
4801  std::to_string(foreign_table->tableId)});
4802  foreign_table->last_refresh_time = last_refresh_time;
4803  foreign_table->next_refresh_time = next_refresh_time;
4804 }
4805 
4806 // TODO(Misiu): This function should be merged with setForeignServerOptions via
4807 // inheritance rather than replication similar functions.
4808 void Catalog::setForeignTableOptions(const std::string& table_name,
4809  foreign_storage::OptionsMap& options_map,
4810  bool clear_existing_options) {
4811  cat_write_lock write_lock(this);
4812  // update in-memory table
4813  auto foreign_table = getForeignTableUnlocked(table_name);
4814  auto saved_options = foreign_table->options;
4815  foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
4816  try {
4817  foreign_table->validateOptionValues();
4818  } catch (const std::exception& e) {
4819  // validation did not succeed:
4820  // revert to saved options & throw exception
4821  foreign_table->options = saved_options;
4822  throw;
4823  }
4824  setForeignTableProperty(
4825  foreign_table, "options", foreign_table->getOptionsAsJsonString());
4826 }
4827 
4828 void Catalog::setForeignTableProperty(const foreign_storage::ForeignTable* table,
4829  const std::string& property,
4830  const std::string& value) {
4831  cat_sqlite_lock sqlite_lock(getObjForLock());
4832  sqliteConnector_.query_with_text_params(
4833  "SELECT table_id from omnisci_foreign_tables where table_id = ?",
4834  std::vector<std::string>{std::to_string(table->tableId)});
4835  auto num_rows = sqliteConnector_.getNumRows();
4836  if (num_rows > 0) {
4837  CHECK_EQ(size_t(1), num_rows);
4838  sqliteConnector_.query_with_text_params(
4839  "UPDATE omnisci_foreign_tables SET " + property + " = ? WHERE table_id = ?",
4840  std::vector<std::string>{value, std::to_string(table->tableId)});
4841  } else {
4842  throw std::runtime_error{"Can not change property \"" + property +
4843  "\" for foreign table." + " Foreign table \"" +
4844  table->tableName + "\" is not found."};
4845  }
4846 }
4847 
4848 std::string Catalog::quoteIfRequired(const std::string& column_name) const {
4849  if (is_reserved_sql_keyword(column_name) || contains_spaces(column_name) ||
4850  contains_sql_reserved_chars(column_name)) {
4851  return get_quoted_string(column_name, '"', '"');
4852  } else {
4853  return column_name;
4854  }
4855 }
4856 
4857 // this will gather information that represents the shared dictionary columns
4858 // as they are on the table NOW not at original creation
4859 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
4860  std::set<std::string>& shared_dict_column_names,
4861  const TableDescriptor* td) const {
4862  if (td->nShards > 0) {
4863  ColumnIdKey columnIdKey(td->tableId, td->shardedColumnId);
4864  auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
4865  CHECK(scd);
4866  std::string txt = "SHARD KEY (" + quoteIfRequired(scd->columnName) + ")";
4867  additional_info.emplace_back(txt);
4868  }
4869  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4870  for (const auto cd : cds) {
4871  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4872  const SQLTypeInfo& ti = cd->columnType;
4873  if (ti.get_compression() != kENCODING_DICT) {
4874  continue;
4875  }
4876  auto dictId = ti.get_comp_param();
4877 
4878  // now we need to check how many other users have this dictionary
4879 
4880  DictRef dict_ref(currentDB_.dbId, dictId);
4881  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4882  if (dictIt == dictDescriptorMapByRef_.end()) {
4883  LOG(ERROR) << "missing dictionary " << dictId << " for table " << td->tableName;
4884  continue;
4885  }
4886 
4887  const auto& dd = dictIt->second;
4888  if (dd->refcount > 1) {
4889  auto lowest_table = td->tableId;
4890  auto lowest_column = cd->columnId;
4891  std::string lowest_column_name;
4892  // we have multiple tables using this dictionary
4893  // find the other occurances and keep the "lowest"
4894  for (auto const& [key, val] : columnDescriptorMap_) {
4895  if (val->columnType.get_compression() == kENCODING_DICT &&
4896  val->columnType.get_comp_param() == dictId &&
4897  !(val->tableId == td->tableId && val->columnId == cd->columnId)) {
4898  if (val->tableId < lowest_table) {
4899  lowest_table = val->tableId;
4900  lowest_column = val->columnId;
4901  lowest_column_name = val->columnName;
4902  }
4903  if (val->columnId < lowest_column) {
4904  lowest_column = val->columnId;
4905  lowest_column_name = val->columnName;
4906  }
4907  }
4908  }
4909  if (lowest_table != td->tableId || lowest_column != cd->columnId) {
4910  // we are referencing a different tables dictionary
4911  auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
4912  CHECK(lowest_td);
4913  std::string txt = "SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
4914  ") REFERENCES " + lowest_td->tableName + "(" +
4915  quoteIfRequired(lowest_column_name) + ")";
4916 
4917  additional_info.emplace_back(txt);
4918  shared_dict_column_names.insert(cd->columnName);
4919  }
4920  }
4921  }
4922  }
4923 }
4924 
4925 int32_t Catalog::createCustomExpression(
4926  std::unique_ptr<CustomExpression> custom_expression) {
4927  cat_write_lock write_lock(this);
4928  cat_sqlite_lock sqlite_lock(getObjForLock());
4929  sqliteConnector_.query("BEGIN TRANSACTION");
4930  int32_t custom_expression_id{-1};
4931  try {
4932  auto data_source_type_str =
4933  CustomExpression::dataSourceTypeToString(custom_expression->data_source_type);
4934  auto data_source_id_str = std::to_string(custom_expression->data_source_id);
4935  std::string custom_expr_select_query{
4936  "SELECT id FROM omnisci_custom_expressions WHERE name = ? and data_source_type = "
4937  "? and data_source_id = ? and is_deleted = ?"};
4938  std::vector<std::string> custom_expr_select_params{custom_expression->name,
4939  data_source_type_str,
4940  data_source_id_str,
4941  std::to_string(false)};
4942  sqliteConnector_.query_with_text_params(custom_expr_select_query,
4943  custom_expr_select_params);
4944  if (sqliteConnector_.getNumRows() > 0) {
4945  throw std::runtime_error{
4946  "A custom expression with the given "
4947  "name and data source already exists."};
4948  }
4949  sqliteConnector_.query_with_text_params(
4950  "INSERT INTO omnisci_custom_expressions(name, expression_json, "
4951  "data_source_type, data_source_id, is_deleted) VALUES (?,?,?,?,?)",
4952  std::vector<std::string>{custom_expression->name,
4953  custom_expression->expression_json,
4954  data_source_type_str,
4955  data_source_id_str,
4956  std::to_string(false)});
4957  sqliteConnector_.query_with_text_params(custom_expr_select_query,
4958  custom_expr_select_params);
4959  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
4960  custom_expression->id = sqliteConnector_.getData<int32_t>(0, 0);
4961  custom_expression_id = custom_expression->id;
4962  CHECK(custom_expr_map_by_id_.find(custom_expression->id) ==
4963  custom_expr_map_by_id_.end());
4964  custom_expr_map_by_id_[custom_expression->id] = std::move(custom_expression);
4965  } catch (std::exception& e) {
4966  sqliteConnector_.query("ROLLBACK TRANSACTION");
4967  throw;
4968  }
4969  sqliteConnector_.query("END TRANSACTION");
4970  CHECK_GT(custom_expression_id, 0);
4971  return custom_expression_id;
4972 }
4973 
4974 const CustomExpression* Catalog::getCustomExpression(int32_t custom_expression_id) const {
4975  cat_read_lock read_lock(this);
4976  auto it = custom_expr_map_by_id_.find(custom_expression_id);
4977  if (it != custom_expr_map_by_id_.end()) {
4978  return it->second.get();
4979  }
4980  return nullptr;
4981 }
4982 
4983 const std::unique_ptr<const CustomExpression> Catalog::getCustomExpressionFromStorage(
4984  int32_t custom_expression_id) {
4985  cat_sqlite_lock sqlite_lock(getObjForLock());
4986  sqliteConnector_.query_with_text_params(
4987  "SELECT id, name, expression_json, data_source_type, data_source_id, "
4988  "is_deleted FROM omnisci_custom_expressions WHERE id = ?",
4989  std::vector<std::string>{to_string(custom_expression_id)});
4990  if (sqliteConnector_.getNumRows() > 0) {
4991  CHECK_EQ(sqliteConnector_.getNumRows(), static_cast<size_t>(1));
4992  return getCustomExpressionFromConnector(0);
4993  }
4994  return nullptr;
4995 }
4996 
4997 std::vector<const CustomExpression*> Catalog::getCustomExpressionsForUser(
4998  const UserMetadata& user) const {
4999  std::vector<const CustomExpression*> all_custom_expressions;
5000  {
5001  // Get custom expression pointers separately in order to avoid holding the catalog
5002  // read lock while checking privileges (which may cause a deadlock).
5003  cat_read_lock read_lock(this);