OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Catalog/Catalog.h"
26 
27 #include <algorithm>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/range/adaptor/map.hpp>
31 #include <boost/version.hpp>
32 #include <cassert>
33 #include <cerrno>
34 #include <cstdio>
35 #include <cstring>
36 #include <exception>
37 #include <fstream>
38 #include <list>
39 #include <memory>
40 #include <random>
41 #include <regex>
42 #include <sstream>
43 
44 #if BOOST_VERSION >= 106600
45 #include <boost/uuid/detail/sha1.hpp>
46 #else
47 #include <boost/uuid/sha1.hpp>
48 #endif
49 #include <rapidjson/document.h>
50 #include <rapidjson/istreamwrapper.h>
51 #include <rapidjson/ostreamwrapper.h>
52 #include <rapidjson/writer.h>
53 
54 #include "Catalog/SysCatalog.h"
55 
56 #include "QueryEngine/Execute.h"
58 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
67 #include "Parser/ParserNode.h"
68 #include "QueryEngine/Execute.h"
70 #include "RefreshTimeCalculator.h"
71 #include "Shared/DateTimeParser.h"
72 #include "Shared/File.h"
73 #include "Shared/StringTransform.h"
74 #include "Shared/measure.h"
76 
77 #include "MapDRelease.h"
78 #include "RWLocks.h"
80 
81 using Chunk_NS::Chunk;
84 using std::list;
85 using std::map;
86 using std::pair;
87 using std::runtime_error;
88 using std::string;
89 using std::vector;
90 
92 bool g_enable_fsi{false};
93 bool g_enable_s3_fsi{false};
94 extern bool g_cache_string_hash;
95 
96 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
97 // under unit testing.
99 
100 namespace Catalog_Namespace {
101 
102 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
104  1073741824; // 2^30, give room for over a billion non-temp tables
106  1073741824; // 2^30, give room for over a billion non-temp dictionaries
107 
108 const std::string Catalog::physicalTableNameTag_("_shard_#");
109 
110 thread_local bool Catalog::thread_holds_read_lock = false;
111 
116 
117 // migration will be done as two step process this release
118 // will create and use new table
119 // next release will remove old table, doing this to have fall back path
120 // incase of migration failure
123  sqliteConnector_.query("BEGIN TRANSACTION");
124  try {
126  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
127  if (sqliteConnector_.getNumRows() != 0) {
128  // already done
129  sqliteConnector_.query("END TRANSACTION");
130  return;
131  }
133  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
134  "userid integer references mapd_users, state text, image_hash text, update_time "
135  "timestamp, "
136  "metadata text, UNIQUE(userid, name) )");
137  // now copy content from old table to new table
139  "insert into mapd_dashboards (id, name , "
140  "userid, state, image_hash, update_time , "
141  "metadata) "
142  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
143  "view_metadata "
144  "from mapd_frontend_views");
145  } catch (const std::exception& e) {
146  sqliteConnector_.query("ROLLBACK TRANSACTION");
147  throw;
148  }
149  sqliteConnector_.query("END TRANSACTION");
150 }
151 
152 namespace {
153 
154 inline auto table_json_filepath(const std::string& base_path,
155  const std::string& db_name) {
156  return boost::filesystem::path(base_path + "/mapd_catalogs/" + db_name +
157  "_temp_tables.json");
158 }
159 
160 } // namespace
161 
162 Catalog::Catalog(const string& basePath,
163  const DBMetadata& curDB,
164  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
165  const std::vector<LeafHostInfo>& string_dict_hosts,
166  std::shared_ptr<Calcite> calcite,
167  bool is_new_db)
168  : basePath_(basePath)
169  , sqliteConnector_(curDB.dbName, basePath + "/mapd_catalogs/")
170  , currentDB_(curDB)
171  , dataMgr_(dataMgr)
172  , string_dict_hosts_(string_dict_hosts)
173  , calciteMgr_(calcite)
174  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
175  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
176  , sqliteMutex_()
177  , sharedMutex_()
178  , thread_holding_sqlite_lock()
179  , thread_holding_write_lock() {
180  if (!is_new_db) {
181  CheckAndExecuteMigrations();
182  }
183  buildMaps();
184  if (!is_new_db) {
185  CheckAndExecuteMigrationsPostBuildMaps();
186  }
188  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
189  }
190  // once all initialized use real object
191  initialized_ = true;
192 }
193 
195 
198  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
199  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
200  tableDescIt != tableDescriptorMap_.end();
201  ++tableDescIt) {
202  tableDescIt->second->fragmenter = nullptr;
203  delete tableDescIt->second;
204  }
205 
206  // TableDescriptorMapById points to the same descriptors. No need to delete
207 
208  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
209  columnDescIt != columnDescriptorMap_.end();
210  ++columnDescIt) {
211  delete columnDescIt->second;
212  }
213 
214  // ColumnDescriptorMapById points to the same descriptors. No need to delete
215 
217  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
218  }
219 }
220 
222  if (initialized_) {
223  return this;
224  } else {
225  return SysCatalog::instance().getDummyCatalog().get();
226  }
227 }
228 
231  sqliteConnector_.query("BEGIN TRANSACTION");
232  try {
233  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
234  std::vector<std::string> cols;
235  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
236  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
237  }
238  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
239  cols.end()) {
240  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
242  sqliteConnector_.query(queryString);
243  }
244  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
245  cols.end()) {
246  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
247  std::to_string(0));
248  sqliteConnector_.query(queryString);
249  }
250  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
251  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
252  std::to_string(-1));
253  sqliteConnector_.query(queryString);
254  }
255  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
256  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
257  std::to_string(0));
258  sqliteConnector_.query(queryString);
259  }
260  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
261  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
262  sqliteConnector_.query(queryString);
263  }
264  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
265  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
267  sqliteConnector_.query(queryString);
268  }
269  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
270  cols.end()) {
272  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
273  }
274  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
275  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
276  sqliteConnector_.query(queryString);
277  }
278  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
279  cols.end()) {
280  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
281  std::to_string(-1));
282  sqliteConnector_.query(queryString);
283  }
284  } catch (std::exception& e) {
285  sqliteConnector_.query("ROLLBACK TRANSACTION");
286  throw;
287  }
288  sqliteConnector_.query("END TRANSACTION");
289 }
290 
293  sqliteConnector_.query("BEGIN TRANSACTION");
294  try {
296  "select name from sqlite_master WHERE type='table' AND "
297  "name='mapd_version_history'");
298  if (sqliteConnector_.getNumRows() == 0) {
300  "CREATE TABLE mapd_version_history(version integer, migration_history text "
301  "unique)");
302  } else {
304  "select * from mapd_version_history where migration_history = "
305  "'notnull_fixlen_arrays'");
306  if (sqliteConnector_.getNumRows() != 0) {
307  // legacy fixlen arrays had migrated
308  // no need for further execution
309  sqliteConnector_.query("END TRANSACTION");
310  return;
311  }
312  }
313  // Insert check for migration
315  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
316  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
317  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
318  // Upating all fixlen array columns
319  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
320  std::to_string(kARRAY) + " AND size>0;");
321  sqliteConnector_.query(queryString);
322  } catch (std::exception& e) {
323  sqliteConnector_.query("ROLLBACK TRANSACTION");
324  throw;
325  }
326  sqliteConnector_.query("END TRANSACTION");
327 }
328 
331  sqliteConnector_.query("BEGIN TRANSACTION");
332  try {
334  "select name from sqlite_master WHERE type='table' AND "
335  "name='mapd_version_history'");
336  if (sqliteConnector_.getNumRows() == 0) {
338  "CREATE TABLE mapd_version_history(version integer, migration_history text "
339  "unique)");
340  } else {
342  "select * from mapd_version_history where migration_history = "
343  "'notnull_geo_columns'");
344  if (sqliteConnector_.getNumRows() != 0) {
345  // legacy geo columns had migrated
346  // no need for further execution
347  sqliteConnector_.query("END TRANSACTION");
348  return;
349  }
350  }
351  // Insert check for migration
353  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
354  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
355  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
356  // Upating all geo columns
357  string queryString(
358  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
359  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
360  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
361  sqliteConnector_.query(queryString);
362  } catch (std::exception& e) {
363  sqliteConnector_.query("ROLLBACK TRANSACTION");
364  throw;
365  }
366  sqliteConnector_.query("END TRANSACTION");
367 }
368 
371  sqliteConnector_.query("BEGIN TRANSACTION");
372  try {
373  // check table still exists
375  "SELECT name FROM sqlite_master WHERE type='table' AND "
376  "name='mapd_frontend_views'");
377  if (sqliteConnector_.getNumRows() == 0) {
378  // table does not exists
379  // no need to migrate
380  sqliteConnector_.query("END TRANSACTION");
381  return;
382  }
383  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
384  std::vector<std::string> cols;
385  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
386  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
387  }
388  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
389  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
390  }
391  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
392  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
393  }
394  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
395  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
396  }
397  } catch (std::exception& e) {
398  sqliteConnector_.query("ROLLBACK TRANSACTION");
399  throw;
400  }
401  sqliteConnector_.query("END TRANSACTION");
402 }
403 
406  sqliteConnector_.query("BEGIN TRANSACTION");
407  try {
409  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
410  "integer references mapd_users, "
411  "link text unique, view_state text, update_time timestamp, view_metadata text)");
412  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
413  std::vector<std::string> cols;
414  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
415  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
416  }
417  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
418  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
419  }
420  } catch (const std::exception& e) {
421  sqliteConnector_.query("ROLLBACK TRANSACTION");
422  throw;
423  }
424  sqliteConnector_.query("END TRANSACTION");
425 }
426 
429  sqliteConnector_.query("BEGIN TRANSACTION");
430  try {
431  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
432  // check table still exists
434  "SELECT name FROM sqlite_master WHERE type='table' AND "
435  "name='mapd_frontend_views'");
436  if (sqliteConnector_.getNumRows() == 0) {
437  // table does not exists
438  // no need to migrate
439  sqliteConnector_.query("END TRANSACTION");
440  return;
441  }
443  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
444  } catch (const std::exception& e) {
445  sqliteConnector_.query("ROLLBACK TRANSACTION");
446  throw;
447  }
448  sqliteConnector_.query("END TRANSACTION");
449 }
450 
451 // introduce DB version into the tables table
452 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
453 // old value
454 
457  if (currentDB_.dbName.length() == 0) {
458  // updateDictionaryNames dbName length is zero nothing to do here
459  return;
460  }
461  sqliteConnector_.query("BEGIN TRANSACTION");
462  try {
463  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
464  std::vector<std::string> cols;
465  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
466  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
467  }
468  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
469  LOG(INFO) << "Updating mapd_tables updatePageSize";
470  // No version number
471  // need to update the defaul tpagesize to old correct value
472  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
473  // need to add new version info
474  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
476  sqliteConnector_.query(queryString);
477  }
478  } catch (std::exception& e) {
479  sqliteConnector_.query("ROLLBACK TRANSACTION");
480  throw;
481  }
482  sqliteConnector_.query("END TRANSACTION");
483 }
484 
487  sqliteConnector_.query("BEGIN TRANSACTION");
488  try {
489  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
490  std::vector<std::string> cols;
491  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
492  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
493  }
494  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
495  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
496  // need to add new version info
497  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
499  sqliteConnector_.query(queryString);
500  // need to add new column to table defintion to indicate deleted column, column used
501  // as bitmap for deleted rows.
503  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
504  }
505  } catch (std::exception& e) {
506  sqliteConnector_.query("ROLLBACK TRANSACTION");
507  throw;
508  }
509  sqliteConnector_.query("END TRANSACTION");
510 }
511 
512 // introduce DB version into the dictionary tables
513 // if the DB does not have a version rename all dictionary tables
514 
517  if (currentDB_.dbName.length() == 0) {
518  // updateDictionaryNames dbName length is zero nothing to do here
519  return;
520  }
521  sqliteConnector_.query("BEGIN TRANSACTION");
522  try {
523  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
524  std::vector<std::string> cols;
525  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
526  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
527  }
528  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
529  // No version number
530  // need to rename dictionaries
531  string dictQuery("SELECT dictid, name from mapd_dictionaries");
532  sqliteConnector_.query(dictQuery);
533  size_t numRows = sqliteConnector_.getNumRows();
534  for (size_t r = 0; r < numRows; ++r) {
535  int dictId = sqliteConnector_.getData<int>(r, 0);
536  std::string dictName = sqliteConnector_.getData<string>(r, 1);
537 
538  std::string oldName =
539  g_base_path + "/mapd_data/" + currentDB_.dbName + "_" + dictName;
540  std::string newName = g_base_path + "/mapd_data/DB_" +
541  std::to_string(currentDB_.dbId) + "_DICT_" +
542  std::to_string(dictId);
543 
544  int result = rename(oldName.c_str(), newName.c_str());
545 
546  if (result == 0) {
547  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
548  << newName;
549  } else {
550  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
551  << newName + " dbname '" << currentDB_.dbName << "' error code "
552  << std::to_string(result);
553  }
554  }
555  // need to add new version info
556  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
558  sqliteConnector_.query(queryString);
559  }
560  } catch (std::exception& e) {
561  sqliteConnector_.query("ROLLBACK TRANSACTION");
562  throw;
563  }
564  sqliteConnector_.query("END TRANSACTION");
565 }
566 
569  sqliteConnector_.query("BEGIN TRANSACTION");
570  try {
572  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
573  "logical_table_id integer, physical_table_id integer)");
574  } catch (const std::exception& e) {
575  sqliteConnector_.query("ROLLBACK TRANSACTION");
576  throw;
577  }
578  sqliteConnector_.query("END TRANSACTION");
579 }
580 
581 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
582  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
583  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
584  */
585 
587  sqliteConnector_.query("BEGIN TRANSACTION");
588  try {
589  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
590  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
591  const auto physicalTables = physicalTableIt->second;
592  CHECK(!physicalTables.empty());
593  for (size_t i = 0; i < physicalTables.size(); i++) {
594  int32_t physical_tb_id = physicalTables[i];
596  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
597  "physical_table_id) VALUES (?1, ?2)",
598  std::vector<std::string>{std::to_string(logical_tb_id),
599  std::to_string(physical_tb_id)});
600  }
601  }
602  } catch (std::exception& e) {
603  sqliteConnector_.query("ROLLBACK TRANSACTION");
604  throw;
605  }
606  sqliteConnector_.query("END TRANSACTION");
607 }
608 
611  sqliteConnector_.query("BEGIN TRANSACTION");
612  try {
613  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
614  std::vector<std::string> cols;
615  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
616  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
617  }
618  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
619  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
620  }
621  } catch (std::exception& e) {
622  sqliteConnector_.query("ROLLBACK TRANSACTION");
623  throw;
624  }
625  sqliteConnector_.query("END TRANSACTION");
626 }
627 
630  sqliteConnector_.query("BEGIN TRANSACTION");
631  try {
634  } catch (std::exception& e) {
635  sqliteConnector_.query("ROLLBACK TRANSACTION");
636  throw;
637  }
638  sqliteConnector_.query("END TRANSACTION");
639 }
640 
643  sqliteConnector_.query("BEGIN TRANSACTION");
644  try {
646  } catch (const std::exception& e) {
647  sqliteConnector_.query("ROLLBACK TRANSACTION");
648  throw;
649  }
650  sqliteConnector_.query("END TRANSACTION");
651 }
652 
653 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
654  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
655  "omnisci_foreign_servers(id integer primary key, name text unique, " +
656  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
657  "options text)";
658 }
659 
660 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
661  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
662  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
663  "options text, last_refresh_time integer, next_refresh_time integer, " +
664  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
665  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
666 }
667 
668 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
669  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
670  "omnisci_custom_expressions(id integer primary key, name text, " +
671  "expression_json text, data_source_type text, " +
672  "data_source_id integer, is_deleted boolean)";
673 }
674 
677  sqliteConnector_.query("BEGIN TRANSACTION");
678  std::vector<DBObject> objects;
679  try {
681  "SELECT name FROM sqlite_master WHERE type='table' AND "
682  "name='mapd_record_ownership_marker'");
683  // check if mapd catalog - marker exists
684  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
685  // already done
686  sqliteConnector_.query("END TRANSACTION");
687  return;
688  }
689  // check if different catalog - marker exists
690  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
691  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
692  // Check if migration is being performed on existing non mapd catalogs
693  // Older non mapd dbs will have table but no record in them
694  if (sqliteConnector_.getNumRows() != 0) {
695  // already done
696  sqliteConnector_.query("END TRANSACTION");
697  return;
698  }
699  }
700  // marker not exists - create one
701  else {
702  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
703  }
704 
705  DBMetadata db;
706  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
707  // place dbId as a refernce for migration being performed
709  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
710  std::vector<std::string>{std::to_string(db.dbOwner)});
711 
712  static const std::map<const DBObjectType, const AccessPrivileges>
713  object_level_all_privs_lookup{
719 
720  // grant owner all permissions on DB
721  DBObjectKey key;
722  key.dbId = currentDB_.dbId;
723  auto _key_place = [&key](auto type) {
724  key.permissionType = type;
725  return key;
726  };
727  for (auto& it : object_level_all_privs_lookup) {
728  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
729  objects.back().setName(currentDB_.dbName);
730  }
731 
732  {
733  // other users tables and views
734  string tableQuery(
735  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
736  sqliteConnector_.query(tableQuery);
737  size_t numRows = sqliteConnector_.getNumRows();
738  for (size_t r = 0; r < numRows; ++r) {
739  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
740  std::string tableName = sqliteConnector_.getData<string>(r, 1);
741  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
742  bool isview = sqliteConnector_.getData<bool>(r, 3);
743 
746  DBObjectKey key;
747  key.dbId = currentDB_.dbId;
748  key.objectId = tableid;
749  key.permissionType = type;
750 
751  DBObject obj(tableName, type);
752  obj.setObjectKey(key);
753  obj.setOwner(ownerid);
756 
757  objects.push_back(obj);
758  }
759  }
760 
761  {
762  // other users dashboards
763  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
764  sqliteConnector_.query(tableQuery);
765  size_t numRows = sqliteConnector_.getNumRows();
766  for (size_t r = 0; r < numRows; ++r) {
767  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
768  std::string dashName = sqliteConnector_.getData<string>(r, 1);
769  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
770 
772  DBObjectKey key;
773  key.dbId = currentDB_.dbId;
774  key.objectId = dashId;
775  key.permissionType = type;
776 
777  DBObject obj(dashName, type);
778  obj.setObjectKey(key);
779  obj.setOwner(ownerid);
781 
782  objects.push_back(obj);
783  }
784  }
785  } catch (const std::exception& e) {
786  sqliteConnector_.query("ROLLBACK TRANSACTION");
787  throw;
788  }
789  sqliteConnector_.query("END TRANSACTION");
790 
791  // now apply the objects to the syscat to track the permisisons
792  // moved outside transaction to avoid lock in sqlite
793  try {
795  } catch (const std::exception& e) {
796  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
797  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
798  e.what());
799  // will need to remove the mapd_record_ownership_marker table and retry
800  }
801 }
802 
807 }
808 
810  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
811  std::vector<std::string> dashboard_ids;
812  static const std::string migration_name{"dashboard_roles_migration"};
813  {
815  sqliteConnector_.query("BEGIN TRANSACTION");
816  try {
817  // migration_history should be present in all catalogs by now
818  // if not then would be created before this migration
820  "select * from mapd_version_history where migration_history = '" +
821  migration_name + "'");
822  if (sqliteConnector_.getNumRows() != 0) {
823  // no need for further execution
824  sqliteConnector_.query("END TRANSACTION");
825  return;
826  }
827  LOG(INFO) << "Performing dashboard internal roles Migration.";
828  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
829  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
832  sqliteConnector_.getData<string>(i, 0)))) {
833  // Successfully created roles during previous migration/crash
834  // No need to include them
835  continue;
836  }
837  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
838  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
839  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
840  }
841  } catch (const std::exception& e) {
842  sqliteConnector_.query("ROLLBACK TRANSACTION");
843  throw;
844  }
845  sqliteConnector_.query("END TRANSACTION");
846  }
847  // All current grantees with shared dashboards.
848  const auto active_grantees =
850 
851  try {
852  // NOTE(wamsi): Transactionally unsafe
853  for (auto dash : dashboards) {
854  createOrUpdateDashboardSystemRole(dash.second.second,
855  dash.second.first,
857  std::to_string(currentDB_.dbId), dash.first));
858  auto result = active_grantees.find(dash.first);
859  if (result != active_grantees.end()) {
862  dash.first)},
863  result->second);
864  }
865  }
867  // check if this has already been completed
869  "select * from mapd_version_history where migration_history = '" +
870  migration_name + "'");
871  if (sqliteConnector_.getNumRows() != 0) {
872  return;
873  }
875  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
876  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
877  } catch (const std::exception& e) {
878  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
879  << e.what();
880  throw;
881  }
882  LOG(INFO) << "Successfully created dashboard system roles during migration.";
883 }
884 
895  updatePageSize();
899  if (g_enable_fsi) {
901  }
903 }
904 
908 }
909 
910 namespace {
911 std::string getUserFromId(const int32_t id) {
912  UserMetadata user;
913  if (SysCatalog::instance().getMetadataForUserById(id, user)) {
914  return user.userName;
915  }
916  // a user could be deleted and a dashboard still exist?
917  return "Unknown";
918 }
919 } // namespace
920 
924 
925  string dictQuery(
926  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
927  sqliteConnector_.query(dictQuery);
928  size_t numRows = sqliteConnector_.getNumRows();
929  for (size_t r = 0; r < numRows; ++r) {
930  int dictId = sqliteConnector_.getData<int>(r, 0);
931  std::string dictName = sqliteConnector_.getData<string>(r, 1);
932  int dictNBits = sqliteConnector_.getData<int>(r, 2);
933  bool is_shared = sqliteConnector_.getData<bool>(r, 3);
934  int refcount = sqliteConnector_.getData<int>(r, 4);
935  std::string fname = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
936  "_DICT_" + std::to_string(dictId);
937  DictRef dict_ref(currentDB_.dbId, dictId);
938  DictDescriptor* dd = new DictDescriptor(
939  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
940  dictDescriptorMapByRef_[dict_ref].reset(dd);
941  }
942 
943  string tableQuery(
944  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
945  "max_chunk_size, frag_page_size, "
946  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
947  "sort_column_id, storage_type, max_rollback_epochs "
948  "from mapd_tables");
949  sqliteConnector_.query(tableQuery);
950  numRows = sqliteConnector_.getNumRows();
951  for (size_t r = 0; r < numRows; ++r) {
952  TableDescriptor* td;
953  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
954  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
955  const auto table_id = sqliteConnector_.getData<int>(r, 0);
956  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
957  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
958  "supported table option (table "
959  << table_name << " [" << table_id << "] in database "
960  << currentDB_.dbName << ").";
961  }
962 
963  if (storage_type == StorageType::FOREIGN_TABLE) {
965  } else {
966  td = new TableDescriptor();
967  }
968 
969  td->storageType = storage_type;
970  td->tableId = sqliteConnector_.getData<int>(r, 0);
971  td->tableName = sqliteConnector_.getData<string>(r, 1);
972  td->nColumns = sqliteConnector_.getData<int>(r, 2);
973  td->isView = sqliteConnector_.getData<bool>(r, 3);
974  td->fragments = sqliteConnector_.getData<string>(r, 4);
975  td->fragType =
977  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
978  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
979  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
980  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
981  td->partitions = sqliteConnector_.getData<string>(r, 10);
982  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
983  td->shard = sqliteConnector_.getData<int>(r, 12);
984  td->nShards = sqliteConnector_.getData<int>(r, 13);
985  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
986  td->userId = sqliteConnector_.getData<int>(r, 15);
987  td->sortedColumnId =
988  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
989  if (!td->isView) {
990  td->fragmenter = nullptr;
991  }
993  td->hasDeletedCol = false;
994 
997  }
998 
999  if (g_enable_fsi) {
1003  }
1004 
1005  string columnQuery(
1006  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1007  "is_notnull, compression, comp_param, "
1008  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol from "
1009  "mapd_columns ORDER BY tableid, "
1010  "columnid");
1011  sqliteConnector_.query(columnQuery);
1012  numRows = sqliteConnector_.getNumRows();
1013  int32_t skip_physical_cols = 0;
1014  for (size_t r = 0; r < numRows; ++r) {
1015  ColumnDescriptor* cd = new ColumnDescriptor();
1016  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1017  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1018  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1022  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1026  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1027  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1028  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1029  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1030  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1031  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1032  cd->isGeoPhyCol = skip_physical_cols > 0;
1033  ColumnKey columnKey(cd->tableId, to_upper(cd->columnName));
1034  columnDescriptorMap_[columnKey] = cd;
1035  ColumnIdKey columnIdKey(cd->tableId, cd->columnId);
1036  columnDescriptorMapById_[columnIdKey] = cd;
1037 
1038  if (skip_physical_cols <= 0) {
1039  skip_physical_cols = cd->columnType.get_physical_cols();
1040  }
1041 
1042  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1043  CHECK(td_itr != tableDescriptorMapById_.end());
1044 
1045  if (cd->isDeletedCol) {
1046  td_itr->second->hasDeletedCol = true;
1047  setDeletedColumnUnlocked(td_itr->second, cd);
1048  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1049  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1050  }
1051  }
1052  // sort columnIdBySpi_ based on columnId
1053  for (auto& tit : tableDescriptorMapById_) {
1054  std::sort(tit.second->columnIdBySpi_.begin(),
1055  tit.second->columnIdBySpi_.end(),
1056  [](const size_t a, const size_t b) -> bool { return a < b; });
1057  }
1058 
1059  string viewQuery("SELECT tableid, sql FROM mapd_views");
1060  sqliteConnector_.query(viewQuery);
1061  numRows = sqliteConnector_.getNumRows();
1062  for (size_t r = 0; r < numRows; ++r) {
1063  int32_t tableId = sqliteConnector_.getData<int>(r, 0);
1064  TableDescriptor* td = tableDescriptorMapById_[tableId];
1065  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1066  td->fragmenter = nullptr;
1067  }
1068 
1069  string frontendViewQuery(
1070  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1071  "userid, "
1072  "metadata "
1073  "FROM mapd_dashboards");
1074  sqliteConnector_.query(frontendViewQuery);
1075  numRows = sqliteConnector_.getNumRows();
1076  for (size_t r = 0; r < numRows; ++r) {
1077  std::shared_ptr<DashboardDescriptor> vd = std::make_shared<DashboardDescriptor>();
1078  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1079  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1080  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1081  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1082  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1083  vd->userId = sqliteConnector_.getData<int>(r, 5);
1084  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1085  vd->user = getUserFromId(vd->userId);
1086  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1088  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1089  }
1090 
1091  string linkQuery(
1092  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1093  "update_time), view_metadata "
1094  "FROM mapd_links");
1095  sqliteConnector_.query(linkQuery);
1096  numRows = sqliteConnector_.getNumRows();
1097  for (size_t r = 0; r < numRows; ++r) {
1098  LinkDescriptor* ld = new LinkDescriptor();
1099  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1100  ld->userId = sqliteConnector_.getData<int>(r, 1);
1101  ld->link = sqliteConnector_.getData<string>(r, 2);
1102  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1103  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1104  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1106  linkDescriptorMapById_[ld->linkId] = ld;
1107  }
1108 
1109  /* rebuild map linking logical tables to corresponding physical ones */
1110  string logicalToPhysicalTableMapQuery(
1111  "SELECT logical_table_id, physical_table_id "
1112  "FROM mapd_logical_to_physical");
1113  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1114  numRows = sqliteConnector_.getNumRows();
1115  for (size_t r = 0; r < numRows; ++r) {
1116  int32_t logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1117  int32_t physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1118  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1119  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1120  /* add new entity to the map logicalToPhysicalTableMapById_ */
1121  std::vector<int32_t> physicalTables;
1122  physicalTables.push_back(physical_tb_id);
1123  const auto it_ok =
1124  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1125  CHECK(it_ok.second);
1126  } else {
1127  /* update map logicalToPhysicalTableMapById_ */
1128  physicalTableIt->second.push_back(physical_tb_id);
1129  }
1130  }
1131 
1133 }
1134 
1137  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1138  "is_deleted "
1139  "FROM omnisci_custom_expressions");
1140  auto num_rows = sqliteConnector_.getNumRows();
1141  for (size_t row = 0; row < num_rows; row++) {
1142  auto custom_expr = getCustomExpressionFromConnector(row);
1143  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1144  }
1145 }
1146 
1147 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1148  auto id = sqliteConnector_.getData<int>(row, 0);
1149  auto name = sqliteConnector_.getData<string>(row, 1);
1150  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1151  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1152  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1153  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1154  return std::make_unique<CustomExpression>(
1155  id,
1156  name,
1157  expression_json,
1158  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1159  data_source_id,
1160  is_deleted);
1161 }
1162 
1164  const list<ColumnDescriptor>& columns,
1165  const list<DictDescriptor>& dicts) {
1166  cat_write_lock write_lock(this);
1167  TableDescriptor* new_td;
1168 
1169  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1170  if (foreign_table) {
1171  auto new_foreign_table = new foreign_storage::ForeignTable();
1172  *new_foreign_table = *foreign_table;
1173  new_td = new_foreign_table;
1174  } else {
1175  new_td = new TableDescriptor();
1176  *new_td = *td;
1177  }
1178 
1179  new_td->mutex_ = std::make_shared<std::mutex>();
1180  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1181  tableDescriptorMapById_[td->tableId] = new_td;
1182  for (auto cd : columns) {
1183  ColumnDescriptor* new_cd = new ColumnDescriptor();
1184  *new_cd = cd;
1185  ColumnKey columnKey(new_cd->tableId, to_upper(new_cd->columnName));
1186  columnDescriptorMap_[columnKey] = new_cd;
1187  ColumnIdKey columnIdKey(new_cd->tableId, new_cd->columnId);
1188  columnDescriptorMapById_[columnIdKey] = new_cd;
1189 
1190  // Add deleted column to the map
1191  if (cd.isDeletedCol) {
1192  CHECK(new_td->hasDeletedCol);
1193  setDeletedColumnUnlocked(new_td, new_cd);
1194  }
1195  }
1196 
1197  std::sort(new_td->columnIdBySpi_.begin(),
1198  new_td->columnIdBySpi_.end(),
1199  [](const size_t a, const size_t b) -> bool { return a < b; });
1200 
1201  std::unique_ptr<StringDictionaryClient> client;
1202  DictRef dict_ref(currentDB_.dbId, -1);
1203  if (!string_dict_hosts_.empty()) {
1204  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1205  }
1206  for (auto dd : dicts) {
1207  if (!dd.dictRef.dictId) {
1208  // Dummy entry created for a shard of a logical table, nothing to do.
1209  continue;
1210  }
1211  dict_ref.dictId = dd.dictRef.dictId;
1212  if (client) {
1213  client->create(dict_ref, dd.dictIsTemp);
1214  }
1215  DictDescriptor* new_dd = new DictDescriptor(dd);
1216  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1217  if (!dd.dictIsTemp) {
1218  boost::filesystem::create_directory(new_dd->dictFolderPath);
1219  }
1220  }
1221 }
1222 
1223 void Catalog::removeTableFromMap(const string& tableName,
1224  const int tableId,
1225  const bool is_on_error) {
1226  cat_write_lock write_lock(this);
1227  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1228  if (tableDescIt == tableDescriptorMapById_.end()) {
1229  throw runtime_error("Table " + tableName + " does not exist.");
1230  }
1231 
1232  TableDescriptor* td = tableDescIt->second;
1233 
1234  if (td->hasDeletedCol) {
1235  const auto ret = deletedColumnPerTable_.erase(td);
1236  CHECK_EQ(ret, size_t(1));
1237  }
1238 
1239  tableDescriptorMapById_.erase(tableDescIt);
1240  tableDescriptorMap_.erase(to_upper(tableName));
1241  td->fragmenter = nullptr;
1242 
1244  delete td;
1245 
1246  std::unique_ptr<StringDictionaryClient> client;
1247  if (SysCatalog::instance().isAggregator()) {
1248  CHECK(!string_dict_hosts_.empty());
1249  DictRef dict_ref(currentDB_.dbId, -1);
1250  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1251  }
1252 
1253  // delete all column descriptors for the table
1254  // no more link columnIds to sequential indexes!
1255  for (auto cit = columnDescriptorMapById_.begin();
1256  cit != columnDescriptorMapById_.end();) {
1257  if (tableId != std::get<0>(cit->first)) {
1258  ++cit;
1259  } else {
1260  int i = std::get<1>(cit++->first);
1261  ColumnIdKey cidKey(tableId, i);
1262  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1263  ColumnDescriptor* cd = colDescIt->second;
1264  columnDescriptorMapById_.erase(colDescIt);
1265  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1266  columnDescriptorMap_.erase(cnameKey);
1267  const int dictId = cd->columnType.get_comp_param();
1268  // Dummy dictionaries created for a shard of a logical table have the id set to
1269  // zero.
1270  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1271  INJECT_TIMER(removingDicts);
1272  DictRef dict_ref(currentDB_.dbId, dictId);
1273  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1274  // If we're removing this table due to an error, it is possible that the string
1275  // dictionary reference was never populated. Don't crash, just continue cleaning
1276  // up the TableDescriptor and ColumnDescriptors
1277  if (!is_on_error) {
1278  CHECK(dictIt != dictDescriptorMapByRef_.end());
1279  } else {
1280  if (dictIt == dictDescriptorMapByRef_.end()) {
1281  continue;
1282  }
1283  }
1284  const auto& dd = dictIt->second;
1285  CHECK_GE(dd->refcount, 1);
1286  --dd->refcount;
1287  if (!dd->refcount) {
1288  dd->stringDict.reset();
1289  if (!isTemp) {
1290  File_Namespace::renameForDelete(dd->dictFolderPath);
1291  }
1292  if (client) {
1293  client->drop(dict_ref);
1294  }
1295  dictDescriptorMapByRef_.erase(dictIt);
1296  }
1297  }
1298 
1299  delete cd;
1300  }
1301  }
1302 }
1303 
1305  cat_write_lock write_lock(this);
1307 }
1308 
1310  cat_write_lock write_lock(this);
1312  std::make_shared<DashboardDescriptor>(vd);
1313 }
1314 
1315 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1316  const int& user_id) {
1317  std::vector<DBObject> objects;
1318  DBObjectKey key;
1319  key.dbId = currentDB_.dbId;
1320  auto _key_place = [&key](auto type, auto id) {
1321  key.permissionType = type;
1322  key.objectId = id;
1323  return key;
1324  };
1325  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1326  auto td = getMetadataForTable(object_name);
1327  if (!td) {
1328  // Parsed object source is not present in current database
1329  // LOG the info and ignore
1330  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1331  << " no longer exists in current DB.";
1332  continue;
1333  }
1334  // Dashboard source can be Table or View
1335  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1336  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1338  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1339  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1340  objects.back().setName(td->tableName);
1341  }
1342  return objects;
1343 }
1344 
1345 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1346  const int32_t& user_id,
1347  const std::string& dash_role_name) {
1348  auto objects = parseDashboardObjects(view_meta, user_id);
1349  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1350  if (!rl) {
1351  // Dashboard role does not exist
1352  // create role and grant privileges
1353  // NOTE(wamsi): Transactionally unsafe
1354  SysCatalog::instance().createRole(dash_role_name, false);
1355  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1356  } else {
1357  // Dashboard system role already exists
1358  // Add/remove privileges on objects
1359  std::set<DBObjectKey> revoke_keys;
1360  auto ex_objects = rl->getDbObjects(true);
1361  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1362  if (key.permissionType != TableDBObjectType &&
1363  key.permissionType != ViewDBObjectType) {
1364  continue;
1365  }
1366  bool found = false;
1367  for (auto obj : objects) {
1368  found = key == obj.getObjectKey() ? true : false;
1369  if (found) {
1370  break;
1371  }
1372  }
1373  if (!found) {
1374  revoke_keys.insert(key);
1375  }
1376  }
1377  for (auto& key : revoke_keys) {
1378  // revoke privs on object since the object is no
1379  // longer used by the dashboard as source
1380  // NOTE(wamsi): Transactionally unsafe
1382  dash_role_name, *rl->findDbObject(key, true), *this);
1383  }
1384  // Update privileges on remaining objects
1385  // NOTE(wamsi): Transactionally unsafe
1386  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1387  }
1388 }
1389 
1391  cat_write_lock write_lock(this);
1392  LinkDescriptor* new_ld = new LinkDescriptor();
1393  *new_ld = ld;
1395  linkDescriptorMapById_[ld.linkId] = new_ld;
1396 }
1397 
1399  auto time_ms = measure<>::execution([&]() {
1400  // instanciate table fragmenter upon first use
1401  // assume only insert order fragmenter is supported
1403  vector<Chunk> chunkVec;
1404  list<const ColumnDescriptor*> columnDescs;
1405  getAllColumnMetadataForTableImpl(td, columnDescs, true, false, true);
1406  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1407  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1408  if (td->sortedColumnId > 0) {
1409  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1410  chunkVec,
1411  dataMgr_.get(),
1412  const_cast<Catalog*>(this),
1413  td->tableId,
1414  td->shard,
1415  td->maxFragRows,
1416  td->maxChunkSize,
1417  td->fragPageSize,
1418  td->maxRows,
1419  td->persistenceLevel);
1420  } else {
1421  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1422  chunkVec,
1423  dataMgr_.get(),
1424  const_cast<Catalog*>(this),
1425  td->tableId,
1426  td->shard,
1427  td->maxFragRows,
1428  td->maxChunkSize,
1429  td->fragPageSize,
1430  td->maxRows,
1431  td->persistenceLevel,
1432  !td->storageType.empty());
1433  }
1434  });
1435  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1436  << time_ms << "ms";
1437 }
1438 
1440  const std::string& tableName) const {
1441  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1442  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1443  return nullptr;
1444  }
1445  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1446 };
1447 
1449  const std::string& tableName) const {
1450  cat_read_lock read_lock(this);
1451  return getForeignTableUnlocked(tableName);
1452 };
1453 
1454 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1455  const bool populateFragmenter) const {
1456  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1457  // pure metadata calls
1458  cat_read_lock read_lock(this);
1459  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1460  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1461  return nullptr;
1462  }
1463  TableDescriptor* td = tableDescIt->second;
1464  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1465  if (populateFragmenter && td->fragmenter == nullptr && !td->isView) {
1467  }
1468  return td; // returns pointer to table descriptor
1469 }
1470 
1472  int tableId,
1473  const bool populateFragmenter) const {
1474  auto tableDescIt = tableDescriptorMapById_.find(tableId);
1475  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1476  return nullptr;
1477  }
1478  TableDescriptor* td = tableDescIt->second;
1479  if (populateFragmenter) {
1480  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1481  if (td->fragmenter == nullptr && !td->isView) {
1483  }
1484  }
1485  return td; // returns pointer to table descriptor
1486 }
1487 
1489  bool populateFragmenter) const {
1490  cat_read_lock read_lock(this);
1491  return getMetadataForTableImpl(tableId, populateFragmenter);
1492 }
1493 
1495  const bool load_dict) const {
1496  cat_read_lock read_lock(this);
1497  return getMetadataForDictUnlocked(dict_id, load_dict);
1498 }
1499 
1501  auto tableDescIt = tableDescriptorMapById_.find(tableId);
1502  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1503  return nullptr;
1504  }
1505  return tableDescIt->second;
1506 }
1507 
1509  const bool loadDict) const {
1510  const DictRef dictRef(currentDB_.dbId, dictId);
1511  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1512  if (dictDescIt ==
1513  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1514  return nullptr;
1515  }
1516  auto& dd = dictDescIt->second;
1517 
1518  if (loadDict) {
1519  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1520  if (!dd->stringDict) {
1521  auto time_ms = measure<>::execution([&]() {
1522  if (string_dict_hosts_.empty()) {
1523  if (dd->dictIsTemp) {
1524  dd->stringDict = std::make_shared<StringDictionary>(
1525  dd->dictFolderPath, true, true, g_cache_string_hash);
1526  } else {
1527  dd->stringDict = std::make_shared<StringDictionary>(
1528  dd->dictFolderPath, false, true, g_cache_string_hash);
1529  }
1530  } else {
1531  dd->stringDict =
1532  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1533  }
1534  });
1535  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1536  << dd->dictRef.dictId << " was " << time_ms << "ms";
1537  }
1538  }
1539 
1540  return dd.get();
1541 }
1542 
1543 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1544  return string_dict_hosts_;
1545 }
1546 
1548  const string& columnName) const {
1549  cat_read_lock read_lock(this);
1550 
1551  ColumnKey columnKey(tableId, to_upper(columnName));
1552  auto colDescIt = columnDescriptorMap_.find(columnKey);
1553  if (colDescIt ==
1554  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
1555  return nullptr;
1556  }
1557  return colDescIt->second;
1558 }
1559 
1560 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
1561  cat_read_lock read_lock(this);
1562  return getMetadataForColumnUnlocked(table_id, column_id);
1563 }
1564 
1566  int columnId) const {
1567  ColumnIdKey columnIdKey(tableId, columnId);
1568  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1569  if (colDescIt == columnDescriptorMapById_
1570  .end()) { // need to check to make sure column exists for table
1571  return nullptr;
1572  }
1573  return colDescIt->second;
1574 }
1575 
1576 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
1577  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1578  CHECK(tableDescriptorMapById_.end() != tabDescIt);
1579  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1580 
1581  auto spx = spi;
1582  int phi = 0;
1583  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
1584  {
1585  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
1586  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
1587  }
1588 
1589  CHECK(0 < spx && spx <= columnIdBySpi.size());
1590  return columnIdBySpi[spx - 1] + phi;
1591 }
1592 
1593 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
1594  cat_read_lock read_lock(this);
1595  return getColumnIdBySpiUnlocked(table_id, spi);
1596 }
1597 
1599  const size_t spi) const {
1600  cat_read_lock read_lock(this);
1601 
1602  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
1603  ColumnIdKey columnIdKey(tableId, columnId);
1604  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1605  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
1606 }
1607 
1608 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
1609  const UserMetadata& user) {
1610  std::stringstream invalid_ids, restricted_ids;
1611 
1612  for (int32_t dashboard_id : dashboard_ids) {
1613  if (!getMetadataForDashboard(dashboard_id)) {
1614  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
1615  continue;
1616  }
1617  DBObject object(dashboard_id, DashboardDBObjectType);
1618  object.loadKey(*this);
1619  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
1620  std::vector<DBObject> privs = {object};
1621  if (!SysCatalog::instance().checkPrivileges(user, privs))
1622  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
1623  }
1624 
1625  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
1626  std::stringstream error_message;
1627  error_message << "Delete dashboard(s) failed with error(s):";
1628  if (invalid_ids.str().size() > 0)
1629  error_message << "\nDashboard id: " << invalid_ids.str()
1630  << " - Dashboard id does not exist";
1631  if (restricted_ids.str().size() > 0)
1632  error_message
1633  << "\nDashboard id: " << restricted_ids.str()
1634  << " - User should be either owner of dashboard or super user to delete it";
1635  throw std::runtime_error(error_message.str());
1636  }
1637  std::vector<DBObject> dash_objs;
1638 
1639  for (int32_t dashboard_id : dashboard_ids) {
1640  dash_objs.push_back(DBObject(dashboard_id, DashboardDBObjectType));
1641  }
1642  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
1644  {
1645  cat_write_lock write_lock(this);
1647 
1648  sqliteConnector_.query("BEGIN TRANSACTION");
1649  try {
1650  for (int32_t dashboard_id : dashboard_ids) {
1651  auto dash = getMetadataForDashboard(dashboard_id);
1652  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
1653  // rollback if already deleted
1654  if (!dash) {
1655  throw std::runtime_error(
1656  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
1657  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
1658  }
1659  std::string user_id = std::to_string(dash->userId);
1660  std::string dash_name = dash->dashboardName;
1661  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
1662  dashboardDescriptorMap_.erase(viewDescIt);
1664  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
1665  std::vector<std::string>{dash_name, user_id});
1666  }
1667  } catch (std::exception& e) {
1668  sqliteConnector_.query("ROLLBACK TRANSACTION");
1669  throw;
1670  }
1671  sqliteConnector_.query("END TRANSACTION");
1672  }
1673 }
1674 
1676  const string& userId,
1677  const string& dashName) const {
1678  cat_read_lock read_lock(this);
1679 
1680  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
1681  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
1682  return nullptr;
1683  }
1684  return viewDescIt->second.get(); // returns pointer to view descriptor
1685 }
1686 
1688  cat_read_lock read_lock(this);
1689  std::string userId;
1690  std::string name;
1691  bool found{false};
1692  {
1693  for (auto descp : dashboardDescriptorMap_) {
1694  auto dash = descp.second.get();
1695  if (dash->dashboardId == id) {
1696  userId = std::to_string(dash->userId);
1697  name = dash->dashboardName;
1698  found = true;
1699  break;
1700  }
1701  }
1702  }
1703  if (found) {
1704  return getMetadataForDashboard(userId, name);
1705  }
1706  return nullptr;
1707 }
1708 
1709 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
1710  cat_read_lock read_lock(this);
1711  auto linkDescIt = linkDescriptorMap_.find(link);
1712  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
1713  return nullptr;
1714  }
1715  return linkDescIt->second; // returns pointer to view descriptor
1716 }
1717 
1719  cat_read_lock read_lock(this);
1720  auto linkDescIt = linkDescriptorMapById_.find(linkId);
1721  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
1722  return nullptr;
1723  }
1724  return linkDescIt->second;
1725 }
1726 
1728  int table_id) const {
1729  auto table = getMetadataForTableImpl(table_id, false);
1730  CHECK(table);
1731  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
1732  CHECK(foreign_table);
1733  return foreign_table;
1734 }
1735 
1737  cat_read_lock read_lock(this);
1738  return getForeignTableUnlocked(table_id);
1739 }
1740 
1742  const TableDescriptor* td,
1743  list<const ColumnDescriptor*>& columnDescriptors,
1744  const bool fetchSystemColumns,
1745  const bool fetchVirtualColumns,
1746  const bool fetchPhysicalColumns) const {
1747  int32_t skip_physical_cols = 0;
1748  for (const auto& columnDescriptor : columnDescriptorMapById_) {
1749  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
1750  --skip_physical_cols;
1751  continue;
1752  }
1753  auto cd = columnDescriptor.second;
1754  if (cd->tableId != td->tableId) {
1755  continue;
1756  }
1757  if (!fetchSystemColumns && cd->isSystemCol) {
1758  continue;
1759  }
1760  if (!fetchVirtualColumns && cd->isVirtualCol) {
1761  continue;
1762  }
1763  if (!fetchPhysicalColumns) {
1764  const auto& col_ti = cd->columnType;
1765  skip_physical_cols = col_ti.get_physical_cols();
1766  }
1767  columnDescriptors.push_back(cd);
1768  }
1769 }
1770 
1771 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
1772  const int tableId,
1773  const bool fetchSystemColumns,
1774  const bool fetchVirtualColumns,
1775  const bool fetchPhysicalColumns) const {
1776  cat_read_lock read_lock(this);
1778  tableId, fetchSystemColumns, fetchVirtualColumns, fetchPhysicalColumns);
1779 }
1780 
1781 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTableUnlocked(
1782  const int tableId,
1783  const bool fetchSystemColumns,
1784  const bool fetchVirtualColumns,
1785  const bool fetchPhysicalColumns) const {
1786  std::list<const ColumnDescriptor*> columnDescriptors;
1787  const TableDescriptor* td =
1788  getMetadataForTableImpl(tableId, false); // dont instantiate fragmenter
1790  columnDescriptors,
1791  fetchSystemColumns,
1792  fetchVirtualColumns,
1793  fetchPhysicalColumns);
1794  return columnDescriptors;
1795 }
1796 
1797 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
1798  cat_read_lock read_lock(this);
1799  list<const TableDescriptor*> table_list;
1800  for (auto p : tableDescriptorMapById_) {
1801  table_list.push_back(p.second);
1802  }
1803  return table_list;
1804 }
1805 
1806 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
1807  list<const DashboardDescriptor*> view_list;
1808  for (auto p : dashboardDescriptorMap_) {
1809  view_list.push_back(p.second.get());
1810  }
1811  return view_list;
1812 }
1813 
1815  const auto& td = *tableDescriptorMapById_[cd.tableId];
1816  list<DictDescriptor> dds;
1817  setColumnDictionary(cd, dds, td, true);
1818  auto& dd = dds.back();
1819  CHECK(dd.dictRef.dictId);
1820 
1821  std::unique_ptr<StringDictionaryClient> client;
1822  if (!string_dict_hosts_.empty()) {
1823  client.reset(new StringDictionaryClient(
1824  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
1825  }
1826  if (client) {
1827  client->create(dd.dictRef, dd.dictIsTemp);
1828  }
1829 
1830  DictDescriptor* new_dd = new DictDescriptor(dd);
1831  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
1832  if (!dd.dictIsTemp) {
1833  boost::filesystem::create_directory(new_dd->dictFolderPath);
1834  }
1835  return dd.dictRef;
1836 }
1837 
1839  cat_write_lock write_lock(this);
1841  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
1842  return;
1843  }
1844  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
1845  return;
1846  }
1847  const auto dictId = cd.columnType.get_comp_param();
1848  CHECK_GT(dictId, 0);
1849  // decrement and zero check dict ref count
1850  const auto td = getMetadataForTable(cd.tableId);
1851  CHECK(td);
1853  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
1854  std::to_string(dictId));
1856  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
1857  const auto refcount = sqliteConnector_.getData<int>(0, 0);
1858  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
1859  << refcount;
1860  if (refcount > 0) {
1861  return;
1862  }
1863  const DictRef dictRef(currentDB_.dbId, dictId);
1864  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
1865  std::to_string(dictId));
1866  File_Namespace::renameForDelete(g_base_path + "/mapd_data/DB_" +
1867  std::to_string(currentDB_.dbId) + "_DICT_" +
1868  std::to_string(dictId));
1869 
1870  std::unique_ptr<StringDictionaryClient> client;
1871  if (!string_dict_hosts_.empty()) {
1872  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
1873  }
1874  if (client) {
1875  client->drop(dictRef);
1876  }
1877 
1878  dictDescriptorMapByRef_.erase(dictRef);
1879 }
1880 
1882  std::map<int, StringDictionary*>& stringDicts) {
1883  // learn 'committed' ColumnDescriptor of this column
1884  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
1885  CHECK(cit != columnDescriptorMap_.end());
1886  auto& ccd = *cit->second;
1887 
1888  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
1889  return;
1890  }
1891  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
1892  return;
1893  }
1894  if (!(ccd.columnType.get_comp_param() > 0)) {
1895  return;
1896  }
1897 
1898  auto dictId = ccd.columnType.get_comp_param();
1899  getMetadataForDict(dictId);
1900 
1901  const DictRef dictRef(currentDB_.dbId, dictId);
1902  auto dit = dictDescriptorMapByRef_.find(dictRef);
1903  CHECK(dit != dictDescriptorMapByRef_.end());
1904  CHECK(dit->second);
1905  CHECK(dit->second.get()->stringDict);
1906  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
1907 }
1908 
1910  cat_write_lock write_lock(this);
1911  // caller must handle sqlite/chunk transaction TOGETHER
1912  cd.tableId = td.tableId;
1913  if (td.nShards > 0 && td.shard < 0) {
1914  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
1915  auto shard_cd = cd;
1916  addColumn(*shard, shard_cd);
1917  }
1918  }
1920  addDictionary(cd);
1921  }
1922 
1924  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
1925  "colscale, is_notnull, "
1926  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
1927  "is_deletedcol) "
1928  "VALUES (?, "
1929  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
1930  "?, ?, ?, "
1931  "?, "
1932  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1933  std::vector<std::string>{std::to_string(td.tableId),
1934  std::to_string(td.tableId),
1935  cd.columnName,
1944  "",
1947  cd.virtualExpr,
1949 
1951  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
1952  std::vector<std::string>{std::to_string(td.tableId)});
1953 
1955  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
1956  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
1957  cd.columnId = sqliteConnector_.getData<int>(0, 0);
1958 
1959  ++tableDescriptorMapById_[td.tableId]->nColumns;
1960  auto ncd = new ColumnDescriptor(cd);
1963  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
1964 }
1965 
1967  cat_write_lock write_lock(this);
1969  // caller must handle sqlite/chunk transaction TOGETHER
1971  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
1972  std::vector<std::string>{std::to_string(td.tableId), std::to_string(cd.columnId)});
1973 
1975  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
1976  std::vector<std::string>{std::to_string(td.tableId)});
1977 
1978  ColumnDescriptorMap::iterator columnDescIt =
1980  CHECK(columnDescIt != columnDescriptorMap_.end());
1981 
1982  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
1983 
1984  columnDescriptorMap_.erase(columnDescIt);
1986  --tableDescriptorMapById_[td.tableId]->nColumns;
1987  // for each shard
1988  if (td.nShards > 0 && td.shard < 0) {
1989  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
1990  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
1991  CHECK(shard_cd);
1992  dropColumn(*shard, *shard_cd);
1993  }
1994  }
1995 }
1996 
1997 void Catalog::roll(const bool forward) {
1998  cat_write_lock write_lock(this);
1999  std::set<const TableDescriptor*> tds;
2000 
2001  for (const auto& cdr : columnDescriptorsForRoll) {
2002  auto ocd = cdr.first;
2003  auto ncd = cdr.second;
2004  CHECK(ocd || ncd);
2005  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2006  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2007  auto td = tabDescIt->second;
2008  auto& vc = td->columnIdBySpi_;
2009  if (forward) {
2010  if (ocd) {
2011  if (nullptr == ncd ||
2012  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2013  delDictionary(*ocd);
2014  }
2015 
2016  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2017 
2018  delete ocd;
2019  }
2020  if (ncd) {
2021  // append columnId if its new and not phy geo
2022  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2023  if (!ncd->isGeoPhyCol) {
2024  vc.push_back(ncd->columnId);
2025  }
2026  }
2027  }
2028  tds.insert(td);
2029  } else {
2030  if (ocd) {
2031  columnDescriptorMap_[ColumnKey(ocd->tableId, to_upper(ocd->columnName))] = ocd;
2032  columnDescriptorMapById_[ColumnIdKey(ocd->tableId, ocd->columnId)] = ocd;
2033  }
2034  // roll back the dict of new column
2035  if (ncd) {
2036  columnDescriptorMap_.erase(ColumnKey(ncd->tableId, to_upper(ncd->columnName)));
2037  columnDescriptorMapById_.erase(ColumnIdKey(ncd->tableId, ncd->columnId));
2038  if (nullptr == ocd ||
2039  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2040  delDictionary(*ncd);
2041  }
2042  delete ncd;
2043  }
2044  }
2045  }
2046  columnDescriptorsForRoll.clear();
2047 
2048  if (forward) {
2049  for (const auto td : tds) {
2050  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2051  }
2052  }
2053 }
2054 
2056  list<ColumnDescriptor>& columns) {
2057  const auto& col_ti = cd.columnType;
2058  if (IS_GEO(col_ti.get_type())) {
2059  switch (col_ti.get_type()) {
2060  case kPOINT: {
2061  ColumnDescriptor physical_cd_coords(true);
2062  physical_cd_coords.columnName = cd.columnName + "_coords";
2063  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2064  // Raw data: compressed/uncompressed coords
2065  coords_ti.set_subtype(kTINYINT);
2066  size_t unit_size;
2067  if (col_ti.get_compression() == kENCODING_GEOINT &&
2068  col_ti.get_comp_param() == 32) {
2069  unit_size = 4 * sizeof(int8_t);
2070  } else {
2071  CHECK(col_ti.get_compression() == kENCODING_NONE);
2072  unit_size = 8 * sizeof(int8_t);
2073  }
2074  coords_ti.set_size(2 * unit_size);
2075  physical_cd_coords.columnType = coords_ti;
2076  columns.push_back(physical_cd_coords);
2077 
2078  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2079 
2080  break;
2081  }
2082  case kLINESTRING: {
2083  ColumnDescriptor physical_cd_coords(true);
2084  physical_cd_coords.columnName = cd.columnName + "_coords";
2085  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2086  // Raw data: compressed/uncompressed coords
2087  coords_ti.set_subtype(kTINYINT);
2088  physical_cd_coords.columnType = coords_ti;
2089  columns.push_back(physical_cd_coords);
2090 
2091  ColumnDescriptor physical_cd_bounds(true);
2092  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2093  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2094  bounds_ti.set_subtype(kDOUBLE);
2095  bounds_ti.set_size(4 * sizeof(double));
2096  physical_cd_bounds.columnType = bounds_ti;
2097  columns.push_back(physical_cd_bounds);
2098 
2099  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2100 
2101  break;
2102  }
2103  case kPOLYGON: {
2104  ColumnDescriptor physical_cd_coords(true);
2105  physical_cd_coords.columnName = cd.columnName + "_coords";
2106  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2107  // Raw data: compressed/uncompressed coords
2108  coords_ti.set_subtype(kTINYINT);
2109  physical_cd_coords.columnType = coords_ti;
2110  columns.push_back(physical_cd_coords);
2111 
2112  ColumnDescriptor physical_cd_ring_sizes(true);
2113  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2114  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2115  ring_sizes_ti.set_subtype(kINT);
2116  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2117  columns.push_back(physical_cd_ring_sizes);
2118 
2119  ColumnDescriptor physical_cd_bounds(true);
2120  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2121  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2122  bounds_ti.set_subtype(kDOUBLE);
2123  bounds_ti.set_size(4 * sizeof(double));
2124  physical_cd_bounds.columnType = bounds_ti;
2125  columns.push_back(physical_cd_bounds);
2126 
2127  ColumnDescriptor physical_cd_render_group(true);
2128  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2129  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2130  physical_cd_render_group.columnType = render_group_ti;
2131  columns.push_back(physical_cd_render_group);
2132 
2133  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2134 
2135  break;
2136  }
2137  case kMULTIPOLYGON: {
2138  ColumnDescriptor physical_cd_coords(true);
2139  physical_cd_coords.columnName = cd.columnName + "_coords";
2140  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2141  // Raw data: compressed/uncompressed coords
2142  coords_ti.set_subtype(kTINYINT);
2143  physical_cd_coords.columnType = coords_ti;
2144  columns.push_back(physical_cd_coords);
2145 
2146  ColumnDescriptor physical_cd_ring_sizes(true);
2147  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2148  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2149  ring_sizes_ti.set_subtype(kINT);
2150  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2151  columns.push_back(physical_cd_ring_sizes);
2152 
2153  ColumnDescriptor physical_cd_poly_rings(true);
2154  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2155  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2156  poly_rings_ti.set_subtype(kINT);
2157  physical_cd_poly_rings.columnType = poly_rings_ti;
2158  columns.push_back(physical_cd_poly_rings);
2159 
2160  ColumnDescriptor physical_cd_bounds(true);
2161  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2162  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2163  bounds_ti.set_subtype(kDOUBLE);
2164  bounds_ti.set_size(4 * sizeof(double));
2165  physical_cd_bounds.columnType = bounds_ti;
2166  columns.push_back(physical_cd_bounds);
2167 
2168  ColumnDescriptor physical_cd_render_group(true);
2169  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2170  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2171  physical_cd_render_group.columnType = render_group_ti;
2172  columns.push_back(physical_cd_render_group);
2173 
2174  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2175 
2176  break;
2177  }
2178  default:
2179  throw runtime_error("Unrecognized geometry type.");
2180  break;
2181  }
2182  }
2183 }
2184 
2185 namespace {
2187  auto timing_type_entry =
2189  CHECK(timing_type_entry != foreign_table.options.end());
2190  if (timing_type_entry->second ==
2192  return foreign_storage::get_next_refresh_time(foreign_table.options);
2193  }
2195 }
2196 } // namespace
2197 
2199  TableDescriptor& td,
2200  const list<ColumnDescriptor>& cols,
2201  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2202  bool isLogicalTable) {
2203  cat_write_lock write_lock(this);
2204  list<ColumnDescriptor> cds = cols;
2205  list<DictDescriptor> dds;
2206  std::set<std::string> toplevel_column_names;
2207  list<ColumnDescriptor> columns;
2208 
2209  if (!td.storageType.empty() &&
2212  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2213  }
2214  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2215  }
2216 
2217  for (auto cd : cds) {
2218  if (cd.columnName == "rowid") {
2219  throw std::runtime_error(
2220  "Cannot create column with name rowid. rowid is a system defined column.");
2221  }
2222  columns.push_back(cd);
2223  toplevel_column_names.insert(cd.columnName);
2224  if (cd.columnType.is_geometry()) {
2225  expandGeoColumn(cd, columns);
2226  }
2227  }
2228  cds.clear();
2229 
2230  ColumnDescriptor cd;
2231  // add row_id column -- Must be last column in the table
2232  cd.columnName = "rowid";
2233  cd.isSystemCol = true;
2234  cd.columnType = SQLTypeInfo(kBIGINT, true);
2235 #ifdef MATERIALIZED_ROWID
2236  cd.isVirtualCol = false;
2237 #else
2238  cd.isVirtualCol = true;
2239  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2240 #endif
2241  columns.push_back(cd);
2242  toplevel_column_names.insert(cd.columnName);
2243 
2244  if (td.hasDeletedCol) {
2245  ColumnDescriptor cd_del;
2246  cd_del.columnName = "$deleted$";
2247  cd_del.isSystemCol = true;
2248  cd_del.isVirtualCol = false;
2249  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2250  cd_del.isDeletedCol = true;
2251 
2252  columns.push_back(cd_del);
2253  }
2254 
2255  td.nColumns = columns.size();
2257  sqliteConnector_.query("BEGIN TRANSACTION");
2259  try {
2261  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2262  std::vector<std::string>{td.tableName,
2263  std::to_string(td.userId),
2265  std::to_string(td.isView),
2266  "",
2271  std::to_string(td.maxRows),
2272  td.partitions,
2274  std::to_string(td.shard),
2275  std::to_string(td.nShards),
2277  td.storageType,
2279  td.keyMetainfo});
2280 
2281  // now get the auto generated tableid
2283  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2284  td.tableId = sqliteConnector_.getData<int>(0, 0);
2285  int colId = 1;
2286  for (auto cd : columns) {
2288  const bool is_foreign_col =
2289  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2290  if (!is_foreign_col) {
2291  setColumnDictionary(cd, dds, td, isLogicalTable);
2292  }
2293  }
2294 
2295  if (toplevel_column_names.count(cd.columnName)) {
2296  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2297  if (colId > 1) {
2298  colId += g_test_against_columnId_gap;
2299  }
2300  if (!cd.isGeoPhyCol) {
2301  td.columnIdBySpi_.push_back(colId);
2302  }
2303  }
2304 
2306  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2307  "coldim, colscale, is_notnull, "
2308  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2309  "virtual_expr, is_deletedcol) "
2310  "VALUES (?, ?, ?, ?, ?, "
2311  "?, "
2312  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2313  std::vector<std::string>{std::to_string(td.tableId),
2314  std::to_string(colId),
2315  cd.columnName,
2324  "",
2327  cd.virtualExpr,
2329  cd.tableId = td.tableId;
2330  cd.columnId = colId++;
2331  cds.push_back(cd);
2332  }
2333  if (td.isView) {
2335  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2336  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2337  }
2339  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2340  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2342  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2343  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2344  std::vector<std::string>{std::to_string(foreign_table.tableId),
2345  std::to_string(foreign_table.foreign_server->id),
2346  foreign_table.getOptionsAsJsonString(),
2347  std::to_string(foreign_table.last_refresh_time),
2348  std::to_string(foreign_table.next_refresh_time)});
2349  }
2350  } catch (std::exception& e) {
2351  sqliteConnector_.query("ROLLBACK TRANSACTION");
2352  throw;
2353  }
2354  } else { // Temporary table
2355  td.tableId = nextTempTableId_++;
2356  int colId = 1;
2357  for (auto cd : columns) {
2359  const bool is_foreign_col =
2360  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2361 
2362  if (!is_foreign_col) {
2363  // Create a new temporary dictionary
2364  std::string fileName("");
2365  std::string folderPath("");
2367  nextTempDictId_++;
2368  DictDescriptor dd(dict_ref,
2369  fileName,
2371  false,
2372  1,
2373  folderPath,
2374  true); // Is dictName (2nd argument) used?
2375  dds.push_back(dd);
2376  if (!cd.columnType.is_array()) {
2378  }
2379  cd.columnType.set_comp_param(dict_ref.dictId);
2380  }
2381  }
2382  if (toplevel_column_names.count(cd.columnName)) {
2383  // make up colId gap for sanity test (begin with 1 bc much code depends on it!)
2384  if (colId > 1) {
2385  colId += g_test_against_columnId_gap;
2386  }
2387  if (!cd.isGeoPhyCol) {
2388  td.columnIdBySpi_.push_back(colId);
2389  }
2390  }
2391  cd.tableId = td.tableId;
2392  cd.columnId = colId++;
2393  cds.push_back(cd);
2394  }
2395 
2397  serializeTableJsonUnlocked(&td, cds);
2398  }
2399  }
2400 
2401  try {
2402  addTableToMap(&td, cds, dds);
2403  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2404  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2405  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2406  }
2407  } catch (std::exception& e) {
2408  sqliteConnector_.query("ROLLBACK TRANSACTION");
2409  removeTableFromMap(td.tableName, td.tableId, true);
2410  throw;
2411  }
2412  sqliteConnector_.query("END TRANSACTION");
2413 }
2414 
2416  const std::list<ColumnDescriptor>& cds) const {
2417  // relies on the catalog write lock
2418  using namespace rapidjson;
2419 
2420  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2421 
2422  const auto db_name = currentDB_.dbName;
2423  const auto file_path = table_json_filepath(basePath_, db_name);
2424 
2425  Document d;
2426  if (boost::filesystem::exists(file_path)) {
2427  // look for an existing file for this database
2428  std::ifstream reader(file_path.string());
2429  CHECK(reader.is_open());
2430  IStreamWrapper json_read_wrapper(reader);
2431  d.ParseStream(json_read_wrapper);
2432  } else {
2433  d.SetObject();
2434  }
2435  CHECK(d.IsObject());
2436  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2437 
2438  Value table(kObjectType);
2439  table.AddMember(
2440  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2441  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2442  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2443 
2444  for (const auto& cd : cds) {
2445  Value column(kObjectType);
2446  column.AddMember(
2447  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2448  column.AddMember("coltype",
2449  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2450  d.GetAllocator());
2451  column.AddMember("colsubtype",
2452  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2453  d.GetAllocator());
2454  column.AddMember(
2455  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2456  column.AddMember(
2457  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2458  column.AddMember(
2459  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2460  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2461  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2462  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2463  table["columns"].PushBack(column, d.GetAllocator());
2464  }
2465  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2466 
2467  // Overwrite the existing file
2468  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2469  CHECK(writer.is_open());
2470  OStreamWrapper json_wrapper(writer);
2471 
2472  Writer<OStreamWrapper> json_writer(json_wrapper);
2473  d.Accept(json_writer);
2474  writer.close();
2475 }
2476 
2477 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2478  // relies on the catalog write lock
2479  using namespace rapidjson;
2480 
2481  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2482 
2483  const auto db_name = currentDB_.dbName;
2484  const auto file_path = table_json_filepath(basePath_, db_name);
2485 
2486  CHECK(boost::filesystem::exists(file_path));
2487  Document d;
2488 
2489  std::ifstream reader(file_path.string());
2490  CHECK(reader.is_open());
2491  IStreamWrapper json_read_wrapper(reader);
2492  d.ParseStream(json_read_wrapper);
2493 
2494  CHECK(d.IsObject());
2495  auto table_name_ref = StringRef(table_name.c_str());
2496  CHECK(d.HasMember(table_name_ref));
2497  CHECK(d.RemoveMember(table_name_ref));
2498 
2499  // Overwrite the existing file
2500  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2501  CHECK(writer.is_open());
2502  OStreamWrapper json_wrapper(writer);
2503 
2504  Writer<OStreamWrapper> json_writer(json_wrapper);
2505  d.Accept(json_writer);
2506  writer.close();
2507 }
2508 
2510  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2511  bool if_not_exists) {
2512  cat_write_lock write_lock(this);
2514  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
2515 }
2516 
2518  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
2519  bool if_not_exists) {
2521  "SELECT name from omnisci_foreign_servers where name = ?",
2522  std::vector<std::string>{foreign_server->name});
2523 
2524  if (sqliteConnector_.getNumRows() == 0) {
2525  foreign_server->creation_time = std::time(nullptr);
2527  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
2528  "creation_time, "
2529  "options) "
2530  "VALUES (?, ?, ?, ?, ?)",
2531  std::vector<std::string>{foreign_server->name,
2532  foreign_server->data_wrapper_type,
2533  std::to_string(foreign_server->user_id),
2534  std::to_string(foreign_server->creation_time),
2535  foreign_server->getOptionsAsJsonString()});
2537  "SELECT id from omnisci_foreign_servers where name = ?",
2538  std::vector<std::string>{foreign_server->name});
2539  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
2540  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
2541  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
2542  std::move(foreign_server);
2543  CHECK(foreignServerMap_.find(foreign_server_shared->name) == foreignServerMap_.end())
2544  << "Attempting to insert a foreign server into foreign server map that already "
2545  "exists.";
2546  foreignServerMap_[foreign_server_shared->name] = foreign_server_shared;
2547  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
2548  } else if (!if_not_exists) {
2549  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
2550  "\" already exists."};
2551  }
2552 }
2553 
2555  const std::string& server_name) const {
2556  foreign_storage::ForeignServer* foreign_server = nullptr;
2557  cat_read_lock read_lock(this);
2558 
2559  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
2560  foreign_server = foreignServerMap_.find(server_name)->second.get();
2561  }
2562  return foreign_server;
2563 }
2564 
2565 const std::unique_ptr<const foreign_storage::ForeignServer>
2566 Catalog::getForeignServerFromStorage(const std::string& server_name) {
2567  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
2570  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
2571  "FROM omnisci_foreign_servers WHERE name = ?",
2572  std::vector<std::string>{server_name});
2573  if (sqliteConnector_.getNumRows() > 0) {
2574  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
2575  sqliteConnector_.getData<int>(0, 0),
2576  sqliteConnector_.getData<std::string>(0, 1),
2577  sqliteConnector_.getData<std::string>(0, 2),
2578  sqliteConnector_.getData<std::string>(0, 3),
2579  sqliteConnector_.getData<std::int32_t>(0, 4),
2580  sqliteConnector_.getData<std::int32_t>(0, 5));
2581  }
2582  return foreign_server;
2583 }
2584 
2585 const std::unique_ptr<const foreign_storage::ForeignTable>
2587  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
2590  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
2591  "omnisci_foreign_tables WHERE table_id = ?",
2592  std::vector<std::string>{to_string(table_id)});
2593  auto num_rows = sqliteConnector_.getNumRows();
2594  if (num_rows > 0) {
2595  CHECK_EQ(size_t(1), num_rows);
2596  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
2597  sqliteConnector_.getData<int>(0, 0),
2598  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
2599  sqliteConnector_.getData<std::string>(0, 2),
2600  sqliteConnector_.getData<int>(0, 3),
2601  sqliteConnector_.getData<int>(0, 4));
2602  }
2603  return foreign_table;
2604 }
2605 
2606 void Catalog::changeForeignServerOwner(const std::string& server_name,
2607  const int new_owner_id) {
2608  cat_write_lock write_lock(this);
2609  foreign_storage::ForeignServer* foreign_server =
2610  foreignServerMap_.find(server_name)->second.get();
2611  CHECK(foreign_server);
2612  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
2613  // update in-memory server
2614  foreign_server->user_id = new_owner_id;
2615 }
2616 
2617 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
2618  const std::string& data_wrapper) {
2619  cat_write_lock write_lock(this);
2620  auto data_wrapper_type = to_upper(data_wrapper);
2621  // update in-memory server
2622  foreign_storage::ForeignServer* foreign_server =
2623  foreignServerMap_.find(server_name)->second.get();
2624  CHECK(foreign_server);
2625  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
2626  foreign_server->data_wrapper_type = data_wrapper_type;
2627  try {
2628  foreign_server->validate();
2629  } catch (const std::exception& e) {
2630  // validation did not succeed:
2631  // revert to saved data_wrapper_type & throw exception
2632  foreign_server->data_wrapper_type = saved_data_wrapper_type;
2633  throw;
2634  }
2635  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
2636 }
2637 
2638 void Catalog::setForeignServerOptions(const std::string& server_name,
2639  const std::string& options) {
2640  cat_write_lock write_lock(this);
2641  // update in-memory server
2642  foreign_storage::ForeignServer* foreign_server =
2643  foreignServerMap_.find(server_name)->second.get();
2644  CHECK(foreign_server);
2645  auto saved_options = foreign_server->options;
2646  foreign_server->populateOptionsMap(options, true);
2647  try {
2648  foreign_server->validate();
2649  } catch (const std::exception& e) {
2650  // validation did not succeed:
2651  // revert to saved options & throw exception
2652  foreign_server->options = saved_options;
2653  throw;
2654  }
2655  setForeignServerProperty(server_name, "options", options);
2656 }
2657 
2658 void Catalog::renameForeignServer(const std::string& server_name,
2659  const std::string& name) {
2660  cat_write_lock write_lock(this);
2661  auto foreign_server_it = foreignServerMap_.find(server_name);
2662  CHECK(foreign_server_it != foreignServerMap_.end());
2663  setForeignServerProperty(server_name, "name", name);
2664  auto foreign_server_shared = foreign_server_it->second;
2665  foreign_server_shared->name = name;
2666  foreignServerMap_[name] = foreign_server_shared;
2667  foreignServerMap_.erase(foreign_server_it);
2668 }
2669 
2670 void Catalog::dropForeignServer(const std::string& server_name) {
2671  cat_write_lock write_lock(this);
2673 
2675  "SELECT id from omnisci_foreign_servers where name = ?",
2676  std::vector<std::string>{server_name});
2677  auto num_rows = sqliteConnector_.getNumRows();
2678  if (num_rows > 0) {
2679  CHECK_EQ(size_t(1), num_rows);
2680  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
2682  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
2683  std::to_string(server_id));
2684  if (sqliteConnector_.getNumRows() > 0) {
2685  throw std::runtime_error{"Foreign server \"" + server_name +
2686  "\" is referenced "
2687  "by existing foreign tables and cannot be dropped."};
2688  }
2689  sqliteConnector_.query("BEGIN TRANSACTION");
2690  try {
2692  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
2693  std::vector<std::string>{server_name});
2694  } catch (const std::exception& e) {
2695  sqliteConnector_.query("ROLLBACK TRANSACTION");
2696  throw;
2697  }
2698  sqliteConnector_.query("END TRANSACTION");
2699  foreignServerMap_.erase(server_name);
2700  foreignServerMapById_.erase(server_id);
2701  }
2702 }
2703 
2705  const rapidjson::Value* filters,
2706  const UserMetadata& user,
2707  std::vector<const foreign_storage::ForeignServer*>& results) {
2708  sys_read_lock syscat_read_lock(&SysCatalog::instance());
2709  cat_read_lock read_lock(this);
2711  // Customer facing and internal SQlite names
2712  std::map<std::string, std::string> col_names{{"server_name", "name"},
2713  {"data_wrapper", "data_wrapper_type"},
2714  {"created_at", "creation_time"},
2715  {"options", "options"}};
2716 
2717  // TODO add "owner" when FSI privilege is implemented
2718  std::stringstream filter_string;
2719  std::vector<std::string> arguments;
2720 
2721  if (filters != nullptr) {
2722  // Create SQL WHERE clause for SQLite query
2723  int num_filters = 0;
2724  filter_string << " WHERE";
2725  for (auto& filter_def : filters->GetArray()) {
2726  if (num_filters > 0) {
2727  filter_string << " " << std::string(filter_def["chain"].GetString());
2728  ;
2729  }
2730 
2731  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
2732  col_names.end()) {
2733  throw std::runtime_error{"Attribute with name \"" +
2734  std::string(filter_def["attribute"].GetString()) +
2735  "\" does not exist."};
2736  }
2737 
2738  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
2739 
2740  bool equals_operator = false;
2741  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
2742  filter_string << " = ? ";
2743  equals_operator = true;
2744  } else {
2745  filter_string << " LIKE ? ";
2746  }
2747 
2748  bool timestamp_column =
2749  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
2750 
2751  if (timestamp_column && !equals_operator) {
2752  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
2753  }
2754 
2755  if (timestamp_column && equals_operator) {
2756  arguments.push_back(std::to_string(
2757  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
2758  } else {
2759  arguments.push_back(filter_def["value"].GetString());
2760  }
2761 
2762  num_filters++;
2763  }
2764  }
2765  // Create select query for the omnisci_foreign_servers table
2766  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
2767  query += filter_string.str();
2768 
2769  sqliteConnector_.query_with_text_params(query, arguments);
2770  auto num_rows = sqliteConnector_.getNumRows();
2771 
2772  if (sqliteConnector_.getNumRows() == 0)
2773  return;
2774 
2776  // Return pointers to objects
2777  results.reserve(num_rows);
2778  for (size_t row = 0; row < num_rows; ++row) {
2779  const foreign_storage::ForeignServer* foreign_server =
2780  getForeignServer(sqliteConnector_.getData<std::string>(row, 0));
2781 
2782  CHECK(foreign_server != nullptr);
2783 
2784  DBObject dbObject(foreign_server->name, ServerDBObjectType);
2785  dbObject.loadKey(*this);
2786  std::vector<DBObject> privObjects = {dbObject};
2787  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
2788  // skip server, as there are no privileges to access it
2789  continue;
2790  }
2791  results.push_back(foreign_server);
2792  }
2793 }
2794 
2795 // returns the table epoch or -1 if there is something wrong with the shared epoch
2796 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
2797  cat_read_lock read_lock(this);
2798  const auto td = getMetadataForTable(table_id, false);
2799  if (!td) {
2800  std::stringstream table_not_found_error_message;
2801  table_not_found_error_message << "Table (" << db_id << "," << table_id
2802  << ") not found";
2803  throw std::runtime_error(table_not_found_error_message.str());
2804  }
2805  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2806  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2807  // check all shards have same checkpoint
2808  const auto physicalTables = physicalTableIt->second;
2809  CHECK(!physicalTables.empty());
2810  size_t curr_epoch{0}, first_epoch{0};
2811  int32_t first_table_id{0};
2812  bool are_epochs_inconsistent{false};
2813  for (size_t i = 0; i < physicalTables.size(); i++) {
2814  int32_t physical_tb_id = physicalTables[i];
2815  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2816  CHECK(phys_td);
2817 
2818  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
2819  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
2820  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
2821  if (i == 0) {
2822  first_epoch = curr_epoch;
2823  first_table_id = physical_tb_id;
2824  } else if (first_epoch != curr_epoch) {
2825  are_epochs_inconsistent = true;
2826  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
2827  << ", db id: " << db_id
2828  << ". First table (table id: " << first_table_id
2829  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
2830  << ", has inconsistent epoch: " << curr_epoch
2831  << ". See previous INFO logs for all epochs and their table ids.";
2832  }
2833  }
2834  if (are_epochs_inconsistent) {
2835  // oh dear the shards do not agree on the epoch for this table
2836  return -1;
2837  }
2838  return curr_epoch;
2839  } else {
2840  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
2841  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
2842  << ", epoch: " << epoch;
2843  return epoch;
2844  }
2845 }
2846 
2847 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
2848  cat_read_lock read_lock(this);
2849  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
2850  << " back to new epoch " << new_epoch;
2851  const auto td = getMetadataForTable(table_id, false);
2852  if (!td) {
2853  std::stringstream table_not_found_error_message;
2854  table_not_found_error_message << "Table (" << db_id << "," << table_id
2855  << ") not found";
2856  throw std::runtime_error(table_not_found_error_message.str());
2857  }
2859  std::stringstream is_temp_table_error_message;
2860  is_temp_table_error_message << "Cannot set epoch on temporary table";
2861  throw std::runtime_error(is_temp_table_error_message.str());
2862  }
2863  File_Namespace::FileMgrParams file_mgr_params;
2864  file_mgr_params.epoch = new_epoch;
2865  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
2866 
2867  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
2868  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
2869  const auto physicalTables = physicalTableIt->second;
2870  CHECK(!physicalTables.empty());
2871  for (size_t i = 0; i < physicalTables.size(); i++) {
2872  const int32_t physical_tb_id = physicalTables[i];
2873  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2874  CHECK(phys_td);
2875  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID "
2876  << physical_tb_id << " back to new epoch " << new_epoch;
2877  // Should have table lock from caller so safe to do this after, avoids
2878  // having to repopulate data on error
2879  removeChunksUnlocked(physical_tb_id);
2880  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
2881  db_id, physical_tb_id, file_mgr_params);
2882  }
2883  } else { // not shared
2884  // Should have table lock from caller so safe to do this after, avoids
2885  // having to repopulate data on error
2886  removeChunksUnlocked(table_id);
2887  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
2888  }
2889 }
2890 
2892  const TableDescriptor* td,
2893  const TableDescriptorUpdateParams& table_update_params) {
2894  // Only called from parent alterTableParamMetadata, expect already to have catalog and
2895  // sqlite write locks
2896 
2897  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
2898 
2900  CHECK(mutable_td);
2901  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
2903  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
2904  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
2905  std::to_string(td->tableId)});
2906  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
2907  }
2908 
2909  if (td->maxRows != table_update_params.max_rows) {
2911  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
2912  std::vector<std::string>{std::to_string(table_update_params.max_rows),
2913  std::to_string(td->tableId)});
2914  mutable_td->maxRows = table_update_params.max_rows;
2915  }
2916 }
2917 
2919  const TableDescriptorUpdateParams& table_update_params) {
2920  cat_write_lock write_lock(this);
2922  sqliteConnector_.query("BEGIN TRANSACTION");
2923  try {
2924  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
2925  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
2926  const auto physical_tables = physical_table_it->second;
2927  CHECK(!physical_tables.empty());
2928  for (size_t i = 0; i < physical_tables.size(); i++) {
2929  int32_t physical_tb_id = physical_tables[i];
2930  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
2931  CHECK(phys_td);
2932  alterPhysicalTableMetadata(phys_td, table_update_params);
2933  }
2934  }
2935  alterPhysicalTableMetadata(td, table_update_params);
2936  } catch (std::exception& e) {
2937  sqliteConnector_.query("ROLLBACK TRANSACTION");
2938  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
2939  }
2940  sqliteConnector_.query("END TRANSACTION");
2941 }
2942 
2943 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
2944  const int32_t max_rollback_epochs) {
2945  // Must be called from AlterTableParamStmt or other method that takes executor and
2946  // TableSchema locks
2947  cat_write_lock write_lock(this); // Consider only taking read lock for potentially
2948  // heavy table storage metadata rolloff operations
2949  if (max_rollback_epochs <= -1) {
2950  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
2951  }
2952  const auto td = getMetadataForTable(
2953  table_id, false); // Deep copy as there will be gap between read and write locks
2954  CHECK(td); // Existence should have already been checked in
2955  // ParserNode::AlterTableParmStmt
2956  TableDescriptorUpdateParams table_update_params(td);
2957  table_update_params.max_rollback_epochs = max_rollback_epochs;
2958  if (table_update_params == td) { // Operator is overloaded to test for equality
2959  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
2960  << " to existing value, skipping operation";
2961  return;
2962  }
2963  File_Namespace::FileMgrParams file_mgr_params;
2964  file_mgr_params.epoch = -1; // Use existing epoch
2965  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
2966  setTableFileMgrParams(table_id, file_mgr_params);
2967  // Unlock as alterTableCatalogMetadata will take write lock, and Catalog locks are not
2968  // upgradeable Should be safe as we have schema lock on this table
2970  alterTableMetadata(td, table_update_params);
2971 }
2972 
2973 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
2974  if (max_rows < 0) {
2975  throw std::runtime_error("Max rows cannot be a negative number.");
2976  }
2977  const auto td = getMetadataForTable(table_id);
2978  CHECK(td);
2979  TableDescriptorUpdateParams table_update_params(td);
2980  table_update_params.max_rows = max_rows;
2981  if (table_update_params == td) {
2982  LOG(INFO) << "Max rows value of " << max_rows
2983  << " is the same as the existing value. Skipping update.";
2984  return;
2985  }
2986  alterTableMetadata(td, table_update_params);
2987  CHECK(td->fragmenter);
2988  td->fragmenter->dropFragmentsToSize(max_rows);
2989 }
2990 
2991 // For testing purposes only
2992 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
2993  cat_write_lock write_lock(this);
2994  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
2995  CHECK(td_entry != tableDescriptorMap_.end());
2996  auto td = td_entry->second;
2997  TableDescriptorUpdateParams table_update_params(td);
2998  table_update_params.max_rollback_epochs = -1;
2999  alterTableMetadata(td, table_update_params);
3000 
3001  File_Namespace::FileMgrParams file_mgr_params;
3002  file_mgr_params.max_rollback_epochs = -1;
3003  setTableFileMgrParams(td->tableId, file_mgr_params);
3004 }
3005 
3007  const int table_id,
3008  const File_Namespace::FileMgrParams& file_mgr_params) {
3009  // Expects parent to have write lock
3010  const auto td = getMetadataForTable(table_id, false);
3011  const auto db_id = this->getDatabaseId();
3012  if (!td) {
3013  std::stringstream table_not_found_error_message;
3014  table_not_found_error_message << "Table (" << db_id << "," << table_id
3015  << ") not found";
3016  throw std::runtime_error(table_not_found_error_message.str());
3017  }
3019  std::stringstream is_temp_table_error_message;
3020  is_temp_table_error_message << "Cannot set storage params on temporary table";
3021  throw std::runtime_error(is_temp_table_error_message.str());
3022  }
3023  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3024  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3025  const auto physical_tables = physical_table_it->second;
3026  CHECK(!physical_tables.empty());
3027  for (size_t i = 0; i < physical_tables.size(); i++) {
3028  const int32_t physical_tb_id = physical_tables[i];
3029  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3030  CHECK(phys_td);
3031  removeChunksUnlocked(physical_tb_id);
3032  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3033  db_id, physical_tb_id, file_mgr_params);
3034  }
3035  } else { // not shared
3036  removeChunksUnlocked(table_id);
3037  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3038  }
3039 }
3040 
3041 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3042  const int32_t table_id) const {
3043  cat_read_lock read_lock(this);
3044  std::vector<TableEpochInfo> table_epochs;
3045  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3046  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3047  const auto physical_tables = physical_table_it->second;
3048  CHECK(!physical_tables.empty());
3049 
3050  for (const auto physical_tb_id : physical_tables) {
3051  const auto phys_td = getMetadataForTableImpl(physical_tb_id, false);
3052  CHECK(phys_td);
3053 
3054  auto table_id = phys_td->tableId;
3055  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3056  table_epochs.emplace_back(table_id, epoch);
3057  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3058  << ", table id: " << table_id << ", epoch: " << epoch;
3059  }
3060  } else {
3061  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3062  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3063  << ", epoch: " << epoch;
3064  table_epochs.emplace_back(table_id, epoch);
3065  }
3066  return table_epochs;
3067 }
3068 
3069 void Catalog::setTableEpochs(const int32_t db_id,
3070  const std::vector<TableEpochInfo>& table_epochs) const {
3071  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3072  CHECK(td);
3073  File_Namespace::FileMgrParams file_mgr_params;
3074  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3075 
3076  cat_read_lock read_lock(this);
3077  for (const auto& table_epoch_info : table_epochs) {
3078  removeChunksUnlocked(table_epoch_info.table_id);
3079  file_mgr_params.epoch = table_epoch_info.table_epoch;
3080  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3081  db_id, table_epoch_info.table_id, file_mgr_params);
3082  LOG(INFO) << "Set table epoch for db id: " << db_id
3083  << ", table id: " << table_epoch_info.table_id
3084  << ", back to epoch: " << table_epoch_info.table_epoch;
3085  }
3086 }
3087 
3088 namespace {
3089 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3090  std::string table_epochs_str{"["};
3091  bool first_entry{true};
3092  for (const auto& table_epoch : table_epochs) {
3093  if (first_entry) {
3094  first_entry = false;
3095  } else {
3096  table_epochs_str += ", ";
3097  }
3098  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3099  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3100  }
3101  table_epochs_str += "]";
3102  return table_epochs_str;
3103 }
3104 } // namespace
3105 
3107  const int32_t db_id,
3108  const std::vector<TableEpochInfo>& table_epochs) const {
3109  try {
3110  setTableEpochs(db_id, table_epochs);
3111  } catch (std::exception& e) {
3112  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3113  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3114  << ", Error: " << e.what();
3115  }
3116 }
3117 
3119  cat_read_lock read_lock(this);
3120  const auto it = deletedColumnPerTable_.find(td);
3121  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3122 }
3123 
3125  int delete_column_id) const {
3126  // check if there are rows deleted by examining the deletedColumn metadata
3127  CHECK(td);
3128  auto fragmenter = td->fragmenter;
3129  if (fragmenter) {
3130  return fragmenter->hasDeletedRows(delete_column_id);
3131  } else {
3132  return false;
3133  }
3134 }
3135 
3137  const TableDescriptor* td) const {
3138  std::vector<const TableDescriptor*> tds;
3139  const ColumnDescriptor* cd;
3140  {
3141  cat_read_lock read_lock(this);
3142 
3143  const auto it = deletedColumnPerTable_.find(td);
3144  // if not a table that supports delete return nullptr, nothing more to do
3145  if (it == deletedColumnPerTable_.end()) {
3146  return nullptr;
3147  }
3148  cd = it->second;
3149  tds = getPhysicalTablesDescriptors(td, false);
3150  }
3151  // individual tables are still protected by higher level locks
3152  for (auto tdd : tds) {
3153  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3154  return cd;
3155  }
3156  }
3157  // no deletes so far recorded in metadata
3158  return nullptr;
3159 }
3160 
3162  cat_write_lock write_lock(this);
3163  setDeletedColumnUnlocked(td, cd);
3164 }
3165 
3167  const ColumnDescriptor* cd) {
3168  cat_write_lock write_lock(this);
3169  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3170  CHECK(it_ok.second);
3171 }
3172 
3173 namespace {
3174 
3176  const Catalog& cat,
3177  const Parser::SharedDictionaryDef& shared_dict_def) {
3178  const auto& table_name = shared_dict_def.get_foreign_table();
3179  const auto td = cat.getMetadataForTable(table_name);
3180  CHECK(td);
3181  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3182  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3183 }
3184 
3185 } // namespace
3186 
3188  Parser::SharedDictionaryDef shared_dict_def,
3189  const bool persist_reference) {
3190  cat_write_lock write_lock(this);
3191  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3192  CHECK(foreign_ref_col);
3193  referencing_column.columnType = foreign_ref_col->columnType;
3194  const int dict_id = referencing_column.columnType.get_comp_param();
3195  const DictRef dict_ref(currentDB_.dbId, dict_id);
3196  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3197  CHECK(dictIt != dictDescriptorMapByRef_.end());
3198  const auto& dd = dictIt->second;
3199  CHECK_GE(dd->refcount, 1);
3200  ++dd->refcount;
3201  if (persist_reference) {
3204  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3205  {std::to_string(dict_id)});
3206  }
3207 }
3208 
3210  ColumnDescriptor& cd,
3211  std::list<ColumnDescriptor>& cdd,
3212  std::list<DictDescriptor>& dds,
3213  const TableDescriptor td,
3214  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3215  cat_write_lock write_lock(this);
3217 
3218  if (shared_dict_defs.empty()) {
3219  return false;
3220  }
3221  for (const auto& shared_dict_def : shared_dict_defs) {
3222  // check if the current column is a referencing column
3223  const auto& column = shared_dict_def.get_column();
3224  if (cd.columnName == column) {
3225  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3226  // Dictionaries are being shared in table to be created
3227  const auto& ref_column = shared_dict_def.get_foreign_column();
3228  auto colIt =
3229  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3230  return !ref_column.compare(it.columnName);
3231  });
3232  CHECK(colIt != cdd.end());
3233  cd.columnType = colIt->columnType;
3234 
3235  const int dict_id = colIt->columnType.get_comp_param();
3236  CHECK_GE(dict_id, 1);
3237  auto dictIt = std::find_if(
3238  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3239  return it.dictRef.dbId == this->currentDB_.dbId &&
3240  it.dictRef.dictId == dict_id;
3241  });
3242  if (dictIt != dds.end()) {
3243  // There exists dictionary definition of a dictionary column
3244  CHECK_GE(dictIt->refcount, 1);
3245  ++dictIt->refcount;
3246  if (!table_is_temporary(&td)) {
3247  // Persist reference count
3249  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3250  {std::to_string(dict_id)});
3251  }
3252  } else {
3253  // The dictionary is referencing a column which is referencing a column in
3254  // diffrent table
3255  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3256  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3257  }
3258  } else {
3259  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3260  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3261  if (table_is_temporary(foreign_td)) {
3262  if (!table_is_temporary(&td)) {
3263  throw std::runtime_error(
3264  "Only temporary tables can share dictionaries with other temporary "
3265  "tables.");
3266  }
3267  addReferenceToForeignDict(cd, shared_dict_def, false);
3268  } else {
3269  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3270  }
3271  }
3272  return true;
3273  }
3274  }
3275  return false;
3276 }
3277 
3279  std::list<DictDescriptor>& dds,
3280  const TableDescriptor& td,
3281  const bool isLogicalTable) {
3282  cat_write_lock write_lock(this);
3283 
3284  std::string dictName{"Initial_key"};
3285  int dictId{0};
3286  std::string folderPath;
3287  if (isLogicalTable) {
3289 
3291  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3292  "?, 1)",
3293  std::vector<std::string>{
3294  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3296  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3297  dictId = sqliteConnector_.getData<int>(0, 0);
3298  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3300  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3301  folderPath = g_base_path + "/mapd_data/DB_" + std::to_string(currentDB_.dbId) +
3302  "_DICT_" + std::to_string(dictId);
3303  }
3305  dictId,
3306  dictName,
3308  false,
3309  1,
3310  folderPath,
3311  false);
3312  dds.push_back(dd);
3313  if (!cd.columnType.is_array()) {
3315  }
3316  cd.columnType.set_comp_param(dictId);
3317 }
3318 
3320  TableDescriptor& td,
3321  const list<ColumnDescriptor>& cols,
3322  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3323  cat_write_lock write_lock(this);
3324 
3325  /* create logical table */
3326  TableDescriptor* tdl = &td;
3327  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3328  int32_t logical_tb_id = tdl->tableId;
3329  std::string logical_table_name = tdl->tableName;
3330 
3331  /* create physical tables and link them to the logical table */
3332  std::vector<int32_t> physicalTables;
3333  for (int32_t i = 1; i <= td.nShards; i++) {
3334  TableDescriptor* tdp = &td;
3335  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3336  tdp->shard = i - 1;
3337  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3338  int32_t physical_tb_id = tdp->tableId;
3339 
3340  /* add physical table to the vector of physical tables */
3341  physicalTables.push_back(physical_tb_id);
3342  }
3343 
3344  if (!physicalTables.empty()) {
3345  /* add logical to physical tables correspondence to the map */
3346  const auto it_ok =
3347  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3348  CHECK(it_ok.second);
3349  /* update sqlite mapd_logical_to_physical in sqlite database */
3350  if (!table_is_temporary(&td)) {
3351  updateLogicalToPhysicalTableMap(logical_tb_id);
3352  }
3353  }
3354 }
3355 
3357  cat_write_lock write_lock(this);
3358 
3359  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3360  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3361  // truncate all corresponding physical tables if this is a logical table
3362  const auto physicalTables = physicalTableIt->second;
3363  CHECK(!physicalTables.empty());
3364  for (size_t i = 0; i < physicalTables.size(); i++) {
3365  int32_t physical_tb_id = physicalTables[i];
3366  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3367  CHECK(phys_td);
3368  doTruncateTable(phys_td);
3369  }
3370  }
3371  doTruncateTable(td);
3372 }
3373 
3375  cat_write_lock write_lock(this);
3376 
3377  const int tableId = td->tableId;
3378  // must destroy fragmenter before deleteChunks is called.
3379  if (td->fragmenter != nullptr) {
3380  auto tableDescIt = tableDescriptorMapById_.find(tableId);
3381  CHECK(tableDescIt != tableDescriptorMapById_.end());
3382  tableDescIt->second->fragmenter = nullptr;
3383  CHECK(td->fragmenter == nullptr);
3384  }
3385  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3386  // assuming deleteChunksWithPrefix is atomic
3387  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3388  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3389 
3390  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3391 
3392  std::unique_ptr<StringDictionaryClient> client;
3393  if (SysCatalog::instance().isAggregator()) {
3394  CHECK(!string_dict_hosts_.empty());
3395  DictRef dict_ref(currentDB_.dbId, -1);
3396  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3397  }
3398  // clean up any dictionaries
3399  // delete all column descriptors for the table
3400  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3401  auto cd = columnDescriptor.second;
3402  if (cd->tableId != td->tableId) {
3403  continue;
3404  }
3405  const int dict_id = cd->columnType.get_comp_param();
3406  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3407  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3408  const DictRef dict_ref(currentDB_.dbId, dict_id);
3409  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3410  CHECK(dictIt != dictDescriptorMapByRef_.end());
3411  const auto& dd = dictIt->second;
3412  CHECK_GE(dd->refcount, 1);
3413  // if this is the only table using this dict reset the dict
3414  if (dd->refcount == 1) {
3415  // close the dictionary
3416  dd->stringDict.reset();
3417  File_Namespace::renameForDelete(dd->dictFolderPath);
3418  if (client) {
3419  client->drop(dd->dictRef);
3420  }
3421  if (!dd->dictIsTemp) {
3422  boost::filesystem::create_directory(dd->dictFolderPath);
3423  }
3424  }
3425 
3426  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3427  dd->dictName,
3428  dd->dictNBits,
3429  dd->dictIsShared,
3430  dd->refcount,
3431  dd->dictFolderPath,
3432  dd->dictIsTemp);
3433  dictDescriptorMapByRef_.erase(dictIt);
3434  // now create new Dict -- need to figure out what to do here for temp tables
3435  if (client) {
3436  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3437  }
3438  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3440  }
3441  }
3442 }
3443 
3444 void Catalog::removeFragmenterForTable(const int table_id) const {
3445  cat_write_lock write_lock(this);
3446  auto td = getMetadataForTable(table_id, false);
3447  if (td->fragmenter != nullptr) {
3448  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3449  CHECK(tableDescIt != tableDescriptorMapById_.end());
3450  tableDescIt->second->fragmenter = nullptr;
3451  CHECK(td->fragmenter == nullptr);
3452  }
3453 }
3454 
3455 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3456 void Catalog::removeChunksUnlocked(const int table_id) const {
3457  auto td = getMetadataForTable(table_id);
3458  CHECK(td);
3459 
3460  if (td->fragmenter != nullptr) {
3461  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3462  CHECK(tableDescIt != tableDescriptorMapById_.end());
3463  tableDescIt->second->fragmenter = nullptr;
3464  CHECK(td->fragmenter == nullptr);
3465  }
3466 
3467  // remove the chunks from in memory structures
3468  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3469 
3470  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3471  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3472 }
3473 
3477  cat_write_lock write_lock(this);
3479  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3480  sqliteConnector_.query("BEGIN TRANSACTION");
3481  try {
3482  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3483  // remove all corresponding physical tables if this is a logical table
3484  const auto physicalTables = physicalTableIt->second;
3485  CHECK(!physicalTables.empty());
3486  for (size_t i = 0; i < physicalTables.size(); i++) {
3487  int32_t physical_tb_id = physicalTables[i];
3488  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3489  CHECK(phys_td);
3490  doDropTable(phys_td);
3491  }
3492 
3493  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
3495  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
3496  std::to_string(td->tableId));
3498  }
3499  doDropTable(td);
3500  } catch (std::exception& e) {
3501  sqliteConnector_.query("ROLLBACK TRANSACTION");
3502  throw;
3503  }
3504  sqliteConnector_.query("END TRANSACTION");
3505 }
3506 
3511  }
3513 }
3514 
3516  const int tableId = td->tableId;
3517  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
3518  std::to_string(tableId));
3520  "select comp_param from mapd_columns where compression = ? and tableid = ?",
3521  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3522  int numRows = sqliteConnector_.getNumRows();
3523  std::vector<int> dict_id_list;
3524  for (int r = 0; r < numRows; ++r) {
3525  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
3526  }
3527  for (auto dict_id : dict_id_list) {
3529  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
3530  std::vector<std::string>{std::to_string(dict_id)});
3531  }
3533  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
3534  "mapd_columns where compression = ? "
3535  "and tableid = ?) and refcount = 0",
3536  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
3537  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
3538  std::to_string(tableId));
3539  if (td->isView) {
3540  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
3541  std::to_string(tableId));
3542  }
3545  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
3546  }
3547 }
3548 
3549 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
3550  cat_write_lock write_lock(this);
3552 
3553  sqliteConnector_.query("BEGIN TRANSACTION");
3554  try {
3556  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3557  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
3558  } catch (std::exception& e) {
3559  sqliteConnector_.query("ROLLBACK TRANSACTION");
3560  throw;
3561  }
3562  sqliteConnector_.query("END TRANSACTION");
3563  TableDescriptorMap::iterator tableDescIt =
3565  CHECK(tableDescIt != tableDescriptorMap_.end());
3566  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3567  // Get table descriptor to change it
3568  TableDescriptor* changeTd = tableDescIt->second;
3569  changeTd->tableName = newTableName;
3570  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3571  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3572  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3573 }
3574 
3575 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
3576  {
3577  cat_write_lock write_lock(this);
3578  cat_sqlite_lock sqlite_lock(getObjForLock());
3579  // rename all corresponding physical tables if this is a logical table
3580  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3581  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3582  const auto physicalTables = physicalTableIt->second;
3583  CHECK(!physicalTables.empty());
3584  for (size_t i = 0; i < physicalTables.size(); i++) {
3585  int32_t physical_tb_id = physicalTables[i];
3586  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3587  CHECK(phys_td);
3588  std::string newPhysTableName =
3589  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
3590  renamePhysicalTable(phys_td, newPhysTableName);
3591  }
3592  }
3593  renamePhysicalTable(td, newTableName);
3594  }
3595  {
3596  DBObject object(newTableName, TableDBObjectType);
3597  // update table name in direct and effective priv map
3598  DBObjectKey key;
3599  key.dbId = currentDB_.dbId;
3600  key.objectId = td->tableId;
3601  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3602  object.setObjectKey(key);
3603  auto objdescs = SysCatalog::instance().getMetadataForObject(
3604  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
3605  for (auto obj : objdescs) {
3606  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3607  if (grnt) {
3608  grnt->renameDbObject(object);
3609  }
3610  }
3612  }
3613 }
3614 
3615 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
3616  std::vector<int>& tableIds) {
3617  cat_write_lock write_lock(this);
3619 
3620  // execute the SQL query
3621  try {
3622  for (size_t i = 0; i < names.size(); i++) {
3623  int tableId = tableIds[i];
3624  std::string& newTableName = names[i].second;
3625 
3627  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
3628  std::vector<std::string>{newTableName, std::to_string(tableId)});
3629  }
3630  } catch (std::exception& e) {
3631  sqliteConnector_.query("ROLLBACK TRANSACTION");
3632  throw;
3633  }
3634 
3635  // reset the table descriptors, give Calcite a kick
3636  for (size_t i = 0; i < names.size(); i++) {
3637  std::string& curTableName = names[i].first;
3638  std::string& newTableName = names[i].second;
3639 
3640  TableDescriptorMap::iterator tableDescIt =
3641  tableDescriptorMap_.find(to_upper(curTableName));
3642  CHECK(tableDescIt != tableDescriptorMap_.end());
3643  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3644 
3645  // Get table descriptor to change it
3646  TableDescriptor* changeTd = tableDescIt->second;
3647  changeTd->tableName = newTableName;
3648  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
3649  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
3650  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
3651  }
3652 }
3653 
3654 // Collect an 'overlay' mapping of the tableNames->tableId
3655 // to account for possible chained renames
3656 // (for swap: a->b, b->c, c->d, d->a)
3657 
3659  std::map<std::string, int>& cachedTableMap,
3660  std::string& curTableName) {
3661  auto iter = cachedTableMap.find(curTableName);
3662  if ((iter != cachedTableMap.end())) {
3663  // get the cached tableId
3664  // and use that to lookup the TableDescriptor
3665  int tableId = (*iter).second;
3666  if (tableId == -1)
3667  return NULL;
3668  else
3669  return cat->getMetadataForTable(tableId);
3670  }
3671 
3672  // else ... lookup in standard location
3673  return cat->getMetadataForTable(curTableName);
3674 }
3675 
3676 void replaceTableName(std::map<std::string, int>& cachedTableMap,
3677  std::string& curTableName,
3678  std::string& newTableName,
3679  int tableId) {
3680  // mark old/cur name as deleted
3681  cachedTableMap[curTableName] = -1;
3682 
3683  // insert the 'new' name
3684  cachedTableMap[newTableName] = tableId;
3685 }
3686 
3687 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
3688  // tableId of all tables being renamed
3689  // ... in matching order to 'names'
3690  std::vector<int> tableIds;
3691 
3692  // (sorted & unique) list of tables ids for locking
3693  // (with names index of src in case of error)
3694  // <tableId, strIndex>
3695  // std::map is by definition/implementation sorted
3696  // std::map current usage below tests to avoid over-write
3697  std::map<int, size_t> uniqueOrderedTableIds;
3698 
3699  // mapping of modified tables names -> tableId
3700  std::map<std::string, int> cachedTableMap;
3701 
3702  // -------- Setup --------
3703 
3704  // gather tableIds pre-execute; build maps
3705  for (size_t i = 0; i < names.size(); i++) {
3706  std::string& curTableName = names[i].first;
3707  std::string& newTableName = names[i].second;
3708 
3709  // make sure the table being renamed exists,
3710  // or will exist when executed in 'name' order
3711  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
3712  CHECK(td);
3713 
3714  tableIds.push_back(td->tableId);
3715  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
3716  // don't overwrite as it should map to the first names index 'i'
3717  uniqueOrderedTableIds[td->tableId] = i;
3718  }
3719  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
3720  }
3721 
3722  CHECK_EQ(tableIds.size(), names.size());
3723 
3724  // The outer Stmt created a write lock before calling the catalog rename table
3725  // -> TODO: might want to sort out which really should set the lock :
3726  // the comment in the outer scope indicates it should be in here
3727  // but it's not clear if the access done there *requires* it out there
3728  //
3729  // Lock tables pre-execute (may/will be in different order than rename occurs)
3730  // const auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
3731  // *legacylockmgr::LockMgr<mapd_shared_mutex, bool>::getMutex(
3732  // legacylockmgr::ExecutorOuterLock, true));
3733 
3734  // acquire the locks for all tables being renamed
3736  for (auto& idPair : uniqueOrderedTableIds) {
3737  std::string& tableName = names[idPair.second].first;
3738  tableLocks.emplace_back(
3741  *this, tableName, false)));
3742  }
3743 
3744  // -------- Rename --------
3745 
3746  {
3747  cat_write_lock write_lock(this);
3749 
3750  sqliteConnector_.query("BEGIN TRANSACTION");
3751 
3752  // collect all (tables + physical tables) into a single list
3753  std::vector<std::pair<std::string, std::string>> allNames;
3754  std::vector<int> allTableIds;
3755 
3756  for (size_t i = 0; i < names.size(); i++) {
3757  int tableId = tableIds[i];
3758  std::string& curTableName = names[i].first;
3759  std::string& newTableName = names[i].second;
3760 
3761  // rename all corresponding physical tables if this is a logical table
3762  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
3763  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3764  const auto physicalTables = physicalTableIt->second;
3765  CHECK(!physicalTables.empty());
3766  for (size_t k = 0; k < physicalTables.size(); k++) {
3767  int32_t physical_tb_id = physicalTables[k];
3768  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
3769  CHECK(phys_td);
3770  std::string newPhysTableName =
3771  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
3772  allNames.push_back(
3773  std::pair<std::string, std::string>(phys_td->tableName, newPhysTableName));
3774  allTableIds.push_back(phys_td->tableId);
3775  }
3776  }
3777  allNames.push_back(std::pair<std::string, std::string>(curTableName, newTableName));
3778  allTableIds.push_back(tableId);
3779  }
3780  // rename all tables in one shot
3781  renamePhysicalTable(allNames, allTableIds);
3782 
3783  sqliteConnector_.query("END TRANSACTION");
3784  // cat write/sqlite locks are released when they go out scope
3785  }
3786  {
3787  // now update the SysCatalog
3788  for (size_t i = 0; i < names.size(); i++) {
3789  int tableId = tableIds[i];
3790  std::string& newTableName = names[i].second;
3791  {
3792  // update table name in direct and effective priv map
3793  DBObjectKey key;
3794  key.dbId = currentDB_.dbId;
3795  key.objectId = tableId;
3796  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
3797 
3798  DBObject object(newTableName, TableDBObjectType);
3799  object.setObjectKey(key);
3800 
3801  auto objdescs = SysCatalog::instance().getMetadataForObject(
3802  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
3803  for (auto obj : objdescs) {
3804  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
3805  if (grnt) {
3806  grnt->renameDbObject(object);
3807  }
3808  }
3810  }
3811  }
3812  }
3813 
3814  // -------- Cleanup --------
3815 
3816  // table locks are released when 'tableLocks' goes out of scope
3817 }
3818 
3820  const ColumnDescriptor* cd,
3821  const string& newColumnName) {
3822  cat_write_lock write_lock(this);
3824  sqliteConnector_.query("BEGIN TRANSACTION");
3825  try {
3826  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3827  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3828  CHECK(cdx);
3829  std::string new_column_name = cdx->columnName;
3830  new_column_name.replace(0, cd->columnName.size(), newColumnName);
3832  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
3833  std::vector<std::string>{new_column_name,
3834  std::to_string(td->tableId),
3835  std::to_string(cdx->columnId)});
3836  }
3837  } catch (std::exception& e) {
3838  sqliteConnector_.query("ROLLBACK TRANSACTION");
3839  throw;
3840  }
3841  sqliteConnector_.query("END TRANSACTION");
3842  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3843  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
3844  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
3845  CHECK(cdx);
3846  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
3847  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
3848  CHECK(columnDescIt != columnDescriptorMap_.end());
3849  ColumnDescriptor* changeCd = columnDescIt->second;
3850  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
3851  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
3852  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
3853  changeCd;
3854  }
3855  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
3856 }
3857 
3859  cat_write_lock write_lock(this);
3861  sqliteConnector_.query("BEGIN TRANSACTION");
3862  try {
3863  // TODO(andrew): this should be an upsert
3865  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
3866  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3867  if (sqliteConnector_.getNumRows() > 0) {
3869  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
3870  "update_time = "
3871  "datetime('now') where name = ? "
3872  "and userid = ?",
3873  std::vector<std::string>{vd.dashboardState,
3874  vd.imageHash,
3875  vd.dashboardMetadata,
3876  vd.dashboardName,
3877  std::to_string(vd.userId)});
3878  } else {
3880  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
3881  "update_time, "
3882  "userid) "
3883  "VALUES "
3884  "(?,?,?,?, "
3885  "datetime('now'), ?)",
3886  std::vector<std::string>{vd.dashboardName,
3887  vd.dashboardState,
3888  vd.imageHash,
3889  vd.dashboardMetadata,
3890  std::to_string(vd.userId)});
3891  }
3892  } catch (std::exception& e) {
3893  sqliteConnector_.query("ROLLBACK TRANSACTION");
3894  throw;
3895  }
3896  sqliteConnector_.query("END TRANSACTION");
3897 
3898  // now get the auto generated dashboardId
3899  try {
3901  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
3902  "WHERE name = ? and userid = ?",
3903  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
3904  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
3905  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
3906  } catch (std::exception& e) {
3907  throw;
3908  }
3912  sqlite_lock.unlock();
3913  write_lock.unlock();
3914  // NOTE(wamsi): Transactionally unsafe
3917  return vd.dashboardId;
3918 }
3919 
3921  cat_write_lock write_lock(this);
3923 
3925  sqliteConnector_.query("BEGIN TRANSACTION");
3926  try {
3928  "SELECT id FROM mapd_dashboards WHERE id = ?",
3929  std::vector<std::string>{std::to_string(vd.dashboardId)});
3930  if (sqliteConnector_.getNumRows() > 0) {
3932  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
3933  "?, userid = ?, update_time = datetime('now') where id = ? ",
3934  std::vector<std::string>{vd.dashboardName,
3935  vd.dashboardState,
3936  vd.imageHash,
3937  vd.dashboardMetadata,
3938  std::to_string(vd.userId),
3940  } else {
3941  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
3942  << " does not exist in db";
3943  throw runtime_error("Error replacing dashboard id " +
3944  std::to_string(vd.dashboardId) + " does not exist in db");
3945  }
3946  } catch (std::exception& e) {
3947  sqliteConnector_.query("ROLLBACK TRANSACTION");
3948  throw;
3949  }
3950  sqliteConnector_.query("END TRANSACTION");
3951 
3952  bool found{false};
3953  for (auto descp : dashboardDescriptorMap_) {
3954  auto dash = descp.second.get();
3955  if (dash->dashboardId == vd.dashboardId) {
3956  found = true;
3957  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
3958  dash->dashboardName);
3959  if (viewDescIt ==
3960  dashboardDescriptorMap_.end()) { // check to make sure view exists
3961  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
3962  << " dashboard " << dash->dashboardName << " does not exist in map";
3963  throw runtime_error("No metadata for dashboard for user " +
3964  std::to_string(dash->userId) + " dashboard " +
3965  dash->dashboardName + " does not exist in map");
3966  }
3967  dashboardDescriptorMap_.erase(viewDescIt);
3968  break;
3969  }
3970  }
3971  if (!found) {
3972  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
3973  << " does not exist in map";
3974  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
3975  " does not exist in map");
3976  }
3977 
3978  // now reload the object
3980  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
3981  "mapd_dashboards "
3982  "WHERE id = ?",
3983  std::vector<std::string>{std::to_string(vd.dashboardId)});
3984  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
3988  sqlite_lock.unlock();
3989  write_lock.unlock();
3990  // NOTE(wamsi): Transactionally unsafe
3993 }
3994 
3995 std::string Catalog::calculateSHA1(const std::string& data) {
3996  boost::uuids::detail::sha1 sha1;
3997  unsigned int digest[5];
3998  sha1.process_bytes(data.c_str(), data.length());
3999  sha1.get_digest(digest);
4000  std::stringstream ss;
4001  for (size_t i = 0; i < 5; i++) {
4002  ss << std::hex << digest[i];
4003  }
4004  return ss.str();
4005 }
4006 
4007 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4008  cat_write_lock write_lock(this);
4010  sqliteConnector_.query("BEGIN TRANSACTION");
4011  try {
4013  .substr(0, 8);
4015  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4016  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4017  if (sqliteConnector_.getNumRows() > 0) {
4019  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4020  "link = ?",
4021  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4022  } else {
4024  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4025  "update_time) VALUES (?,?,?,?, datetime('now'))",
4026  std::vector<std::string>{
4027  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4028  }
4029  // now get the auto generated dashid
4031  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4032  "WHERE link = ?",
4033  ld.link);
4034  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4035  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4036  } catch (std::exception& e) {
4037  sqliteConnector_.query("ROLLBACK TRANSACTION");
4038  throw;
4039  }
4040  sqliteConnector_.query("END TRANSACTION");
4041  addLinkToMap(ld);
4042  return ld.link;
4043 }
4044 
4046  const TableDescriptor* td) const {
4047  cat_read_lock read_lock(this);
4048 
4049  const auto column_descriptors =
4050  getAllColumnMetadataForTable(td->tableId, false, true, true);
4051 
4052  const ColumnDescriptor* shard_cd{nullptr};
4053  int i = 1;
4054  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4055  ++cd_itr, ++i) {
4056  if (i == td->shardedColumnId) {
4057  shard_cd = *cd_itr;
4058  }
4059  }
4060  return shard_cd;
4061 }
4062 
4063 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4064  const TableDescriptor* logical_table_desc,
4065  bool populate_fragmenter) const {
4066  cat_read_lock read_lock(this);
4067  const auto physicalTableIt =
4068  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4069  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4070  return {logical_table_desc};
4071  }
4072 
4073  const auto physicalTablesIds = physicalTableIt->second;
4074  CHECK(!physicalTablesIds.empty());
4075  std::vector<const TableDescriptor*> physicalTables;
4076  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4077  physicalTables.push_back(
4078  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4079  }
4080 
4081  return physicalTables;
4082 }
4083 
4084 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4085  cat_read_lock read_lock(this);
4086 
4087  std::map<int, const ColumnDescriptor*> mapping;
4088 
4089  const auto tables = getAllTableMetadata();
4090  for (const auto td : tables) {
4091  if (td->shard >= 0) {
4092  // skip shards, they're not standalone tables
4093  continue;
4094  }
4095 
4096  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4097  const auto& ti = cd->columnType;
4098  if (ti.is_string()) {
4099  if (ti.get_compression() == kENCODING_DICT) {
4100  // if foreign reference, get referenced tab.col
4101  const auto dict_id = ti.get_comp_param();
4102 
4103  // ignore temp (negative) dictionaries
4104  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4105  mapping[dict_id] = cd;
4106  }
4107  }
4108  }
4109  }
4110  }
4111 
4112  return mapping;
4113 }
4114 
4116  const UserMetadata& user_metadata,
4117  const GetTablesType get_tables_type) const {
4118  if (td->shard >= 0) {
4119  // skip shards, they're not standalone tables
4120  return false;
4121  }
4122  switch (get_tables_type) {
4123  case GET_PHYSICAL_TABLES: {
4124  if (td->isView) {
4125  return false;
4126  }
4127  break;
4128  }
4129  case GET_VIEWS: {
4130  if (!td->isView) {
4131  return false;
4132  }
4133  break;
4134  }
4135  default:
4136  break;
4137  }
4139  dbObject.loadKey(*this);
4140  std::vector<DBObject> privObjects = {dbObject};
4141  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4142  // skip table, as there are no privileges to access it
4143  return false;
4144  }
4145  return true;
4146 }
4147 
4148 std::vector<std::string> Catalog::getTableNamesForUser(
4149  const UserMetadata& user_metadata,
4150  const GetTablesType get_tables_type) const {
4151  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4152  cat_read_lock read_lock(this);
4153 
4154  std::vector<std::string> table_names;
4155  const auto tables = getAllTableMetadata();
4156  for (const auto td : tables) {
4157  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4158  table_names.push_back(td->tableName);
4159  }
4160  }
4161  return table_names;
4162 }
4163 
4164 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4165  const UserMetadata& user_metadata,
4166  const GetTablesType get_tables_type,
4167  const std::string& filter_table_name) const {
4168  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4169  cat_read_lock read_lock(this);
4170 
4171  std::vector<TableMetadata> tables_metadata;
4172  const auto tables = getAllTableMetadata();
4173  for (const auto td : tables) {
4174  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4175  if (!filter_table_name.empty()) {
4176  if (td->tableName != filter_table_name) {
4177  continue;
4178  }
4179  }
4180  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4181  // descriptor outside catalog lock
4182  tables_metadata.emplace_back(table_metadata);
4183  }
4184  }
4185  return tables_metadata;
4186 }
4187 
4188 int Catalog::getLogicalTableId(const int physicalTableId) const {
4189  cat_read_lock read_lock(this);
4190  for (const auto& l : logicalToPhysicalTableMapById_) {
4191  if (l.second.end() != std::find_if(l.second.begin(),
4192  l.second.end(),
4193  [&](decltype(*l.second.begin()) tid) -> bool {
4194  return physicalTableId == tid;
4195  })) {
4196  return l.first;
4197  }
4198  }
4199  return physicalTableId;
4200 }
4201 
4202 void Catalog::checkpoint(const int logicalTableId) const {
4203  const auto td = getMetadataForTable(logicalTableId);
4204  const auto shards = getPhysicalTablesDescriptors(td);
4205  for (const auto shard : shards) {
4206  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4207  }
4208 }
4209 
4210 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4211  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4212  try {
4213  checkpoint(logical_table_id);
4214  } catch (...) {
4215  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4216  throw;
4217  }
4218 }
4219 
4221  cat_write_lock write_lock(this);
4222  // Physically erase all tables and dictionaries from disc and memory
4223  const auto tables = getAllTableMetadata();
4224  for (const auto table : tables) {
4226  }
4227  // Physically erase database metadata
4228  boost::filesystem::remove(basePath_ + "/mapd_catalogs/" + currentDB_.dbName);
4229  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4230 }
4231 
4233  const int tableId = td->tableId;
4234  // must destroy fragmenter before deleteChunks is called.
4235  if (td->fragmenter != nullptr) {
4236  auto tableDescIt = tableDescriptorMapById_.find(tableId);
4237  CHECK(tableDescIt != tableDescriptorMapById_.end());
4238  {
4239  INJECT_TIMER(deleting_fragmenter);
4240  tableDescIt->second->fragmenter = nullptr;
4241  }
4242  }
4243  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4244  {
4245  INJECT_TIMER(deleteChunksWithPrefix);
4246  // assuming deleteChunksWithPrefix is atomic
4247  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4248  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4249  }
4250  if (!td->isView) {
4251  INJECT_TIMER(Remove_Table);
4252  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4253  }
4254  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4255  {
4256  INJECT_TIMER(removeTableFromMap_);
4257  removeTableFromMap(td->tableName, tableId);
4258  }
4259 }
4260 
4261 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4262  const int32_t& shardNumber) {
4263  std::string physicalTableName =
4264  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4265  return (physicalTableName);
4266 }
4267 
4270  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4271  "omnisci_foreign_servers");
4272  auto num_rows = sqliteConnector_.getNumRows();
4273  for (size_t row = 0; row < num_rows; row++) {
4274  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4275  sqliteConnector_.getData<int>(row, 0),
4276  sqliteConnector_.getData<std::string>(row, 1),
4277  sqliteConnector_.getData<std::string>(row, 2),
4278  sqliteConnector_.getData<std::string>(row, 3),
4279  sqliteConnector_.getData<std::int32_t>(row, 4),
4280  sqliteConnector_.getData<std::int32_t>(row, 5));
4281  foreignServerMap_[foreign_server->name] = foreign_server;
4282  foreignServerMapById_[foreign_server->id] = foreign_server;
4283  }
4284 }
4285 
4288  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4289  "omnisci_foreign_tables");
4290  auto num_rows = sqliteConnector_.getNumRows();
4291  for (size_t r = 0; r < num_rows; r++) {
4292  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4293  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4294  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4295  const auto last_refresh_time = sqliteConnector_.getData<int>(r, 3);
4296  const auto next_refresh_time = sqliteConnector_.getData<int>(r, 4);
4297 
4298  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4299  auto foreign_table =
4300  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4301  CHECK(foreign_table);
4302  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4303  CHECK(foreign_table->foreign_server);
4304  foreign_table->populateOptionsMap(options);
4305  foreign_table->last_refresh_time = last_refresh_time;
4306  foreign_table->next_refresh_time = next_refresh_time;
4307  }
4308 }
4309 
4310 void Catalog::setForeignServerProperty(const std::string& server_name,
4311  const std::string& property,
4312  const std::string& value) {
4315  "SELECT id from omnisci_foreign_servers where name = ?",
4316  std::vector<std::string>{server_name});
4317  auto num_rows = sqliteConnector_.getNumRows();
4318  if (num_rows > 0) {
4319  CHECK_EQ(size_t(1), num_rows);
4320  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4322  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4323  std::vector<std::string>{value, std::to_string(server_id)});
4324  } else {
4325  throw std::runtime_error{"Can not change property \"" + property +
4326  "\" for foreign server." + " Foreign server \"" +
4327  server_name + "\" is not found."};
4328  }
4329 }
4330 
4335 
4336  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4337  "omnisci_local_csv",
4339  options,
4341  local_csv_server->validate();
4342  createForeignServerNoLocks(std::move(local_csv_server), true);
4343 
4344  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4345  "omnisci_local_parquet",
4347  options,
4349  local_parquet_server->validate();
4350  createForeignServerNoLocks(std::move(local_parquet_server), true);
4351 }
4352 
4353 // prepare a fresh file reload on next table access
4354 void Catalog::setForReload(const int32_t tableId) {
4355  cat_read_lock read_lock(this);
4356  const auto td = getMetadataForTable(tableId);
4357  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4358  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4359  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4360  }
4361 }
4362 
4363 // get a table's data dirs
4364 std::vector<std::string> Catalog::getTableDataDirectories(
4365  const TableDescriptor* td) const {
4366  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4367  std::vector<std::string> file_paths;
4368  for (auto shard : getPhysicalTablesDescriptors(td)) {
4369  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4370  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4371  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4372  file_paths.push_back(file_path.filename().string());
4373  }
4374  return file_paths;
4375 }
4376 
4377 // get a column's dict dir basename
4379  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4381  cd->columnType.get_comp_param() > 0) {
4382  const auto dictId = cd->columnType.get_comp_param();
4383  const DictRef dictRef(currentDB_.dbId, dictId);
4384  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4385  CHECK(dit != dictDescriptorMapByRef_.end());
4386  CHECK(dit->second);
4387  boost::filesystem::path file_path(dit->second->dictFolderPath);
4388  return file_path.filename().string();
4389  }
4390  return std::string();
4391 }
4392 
4393 // get a table's dict dirs
4394 std::vector<std::string> Catalog::getTableDictDirectories(
4395  const TableDescriptor* td) const {
4396  std::vector<std::string> file_paths;
4397  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4398  auto file_base = getColumnDictDirectory(cd);
4399  if (!file_base.empty() &&
4400  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4401  file_paths.push_back(file_base);
4402  }
4403  }
4404  return file_paths;
4405 }
4406 
4407 // returns table schema in a string
4408 // NOTE(sy): Might be able to replace dumpSchema() later with
4409 // dumpCreateTable() after a deeper review of the TableArchiver code.
4410 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
4411  cat_read_lock read_lock(this);
4412 
4413  std::ostringstream os;
4414  os << "CREATE TABLE @T (";
4415  // gather column defines
4416  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4417  std::string comma;
4418  std::vector<std::string> shared_dicts;
4419  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4420  for (const auto cd : cds) {
4421  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4422  const auto& ti = cd->columnType;
4423  os << comma << cd->columnName;
4424  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4425  if (ti.get_type() == SQLTypes::kCHAR) {
4426  os << " "
4427  << "TEXT";
4428  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4429  os << " "
4430  << "TEXT[]";
4431  } else {
4432  os << " " << ti.get_type_name();
4433  }
4434  os << (ti.get_notnull() ? " NOT NULL" : "");
4435  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4436  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4437  if (ti.get_compression() == kENCODING_DICT) {
4438  // if foreign reference, get referenced tab.col
4439  const auto dict_id = ti.get_comp_param();
4440  const DictRef dict_ref(currentDB_.dbId, dict_id);
4441  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4442  CHECK(dict_it != dictDescriptorMapByRef_.end());
4443  const auto dict_name = dict_it->second->dictName;
4444  // when migrating a table, any foreign dict ref will be dropped
4445  // and the first cd of a dict will become root of the dict
4446  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
4447  dict_root_cds[dict_name] = cd;
4448  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4449  } else {
4450  const auto dict_root_cd = dict_root_cds[dict_name];
4451  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
4452  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
4453  // "... shouldn't specify an encoding, it borrows from the referenced
4454  // column"
4455  }
4456  } else {
4457  os << " ENCODING NONE";
4458  }
4459  } else if (ti.is_date_in_days() ||
4460  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4461  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4462  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4463  } else if (ti.is_geometry()) {
4464  if (ti.get_compression() == kENCODING_GEOINT) {
4465  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4466  << ")";
4467  } else {
4468  os << " ENCODING NONE";
4469  }
4470  }
4471  comma = ", ";
4472  }
4473  }
4474  // gather SHARED DICTIONARYs
4475  if (shared_dicts.size()) {
4476  os << ", " << boost::algorithm::join(shared_dicts, ", ");
4477  }
4478  // gather WITH options ...
4479  std::vector<std::string> with_options;
4480  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4481  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4482  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4483  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4484  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
4485  : "VACUUM='IMMEDIATE'");
4486  if (!td->partitions.empty()) {
4487  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4488  }
4489  if (td->nShards > 0) {
4490  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
4491  CHECK(shard_cd);
4492  os << ", SHARD KEY(" << shard_cd->columnName << ")";
4493  with_options.push_back(
4494  "SHARD_COUNT=" +
4495  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
4496  }
4497  if (td->sortedColumnId > 0) {
4498  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
4499  CHECK(sort_cd);
4500  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
4501  }
4503  td->maxRollbackEpochs != -1) {
4504  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4506  }
4507  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
4508  return os.str();
4509 }
4510 
4511 #include "Parser/ReservedKeywords.h"
4512 
4514 inline bool contains_spaces(std::string_view str) {
4515  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
4516  return std::isspace(ch);
4517  }) != str.end();
4518 }
4519 
4522  std::string_view str,
4523  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
4524  return str.find_first_of(chars) != std::string_view::npos;
4525 }
4526 
4528 inline bool is_reserved_sql_keyword(std::string_view str) {
4529  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
4530 }
4531 
4532 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
4534  bool multiline_formatting,
4535  bool dump_defaults) const {
4536  cat_read_lock read_lock(this);
4537 
4538  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
4539  std::ostringstream os;
4540 
4541  if (foreign_table) {
4542  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
4543  } else if (!td->isView) {
4544  os << "CREATE ";
4546  os << "TEMPORARY ";
4547  }
4548  os << "TABLE " + td->tableName + " (";
4549  } else {
4550  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
4551  return os.str();
4552  }
4553  // scan column defines
4554  std::vector<std::string> additional_info;
4555  std::set<std::string> shared_dict_column_names;
4556 
4557  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
4558 
4559  // gather column defines
4560  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
4561  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
4562  bool first = true;
4563  for (const auto cd : cds) {
4564  if (!(cd->isSystemCol || cd->isVirtualCol)) {
4565  const auto& ti = cd->columnType;
4566  if (!first) {
4567  os << ",";
4568  if (!multiline_formatting) {
4569  os << " ";
4570  }
4571  } else {
4572  first = false;
4573  }
4574  if (multiline_formatting) {
4575  os << "\n ";
4576  }
4577  // column name
4578  os << quoteIfRequired(cd->columnName);
4579  // CHAR is perculiar... better dump it as TEXT(32) like \d does
4580  if (ti.get_type() == SQLTypes::kCHAR) {
4581  os << " "
4582  << "TEXT";
4583  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
4584  os << " "
4585  << "TEXT[]";
4586  } else {
4587  os << " " << ti.get_type_name();
4588  }
4589  os << (ti.get_notnull() ? " NOT NULL" : "");
4590  if (shared_dict_column_names.find(cd->columnName) ==
4591  shared_dict_column_names.end()) {
4592  // avoids "Exception: Column ... shouldn't specify an encoding, it borrows it
4593  // from the referenced column"
4594  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
4595  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
4596  if (ti.get_compression() == kENCODING_DICT) {
4597  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
4598  } else {
4599  os << " ENCODING NONE";
4600  }
4601  } else if (ti.is_date_in_days() ||
4602  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
4603  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
4604  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
4605  } else if (ti.is_geometry()) {
4606  if (ti.get_compression() == kENCODING_GEOINT) {
4607  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
4608  << ")";
4609  } else {
4610  os << " ENCODING NONE";
4611  }
4612  }
4613  }
4614  }
4615  }
4616  // gather SHARED DICTIONARYs
4617  if (additional_info.size()) {
4618  std::string comma;
4619  if (!multiline_formatting) {
4620  comma = ", ";
4621  } else {
4622  comma = ",\n ";
4623  }
4624  os << comma;
4625  os << boost::algorithm::join(additional_info, comma);
4626  }
4627  os << ")";
4628 
4629  std::vector<std::string> with_options;
4630  if (foreign_table) {
4631  if (multiline_formatting) {
4632  os << "\n";
4633  } else {
4634  os << " ";
4635  }
4636  os << "SERVER " << foreign_table->foreign_server->name;
4637 
4638  // gather WITH options ...
4639  for (const auto& [option, value] : foreign_table->options) {
4640  with_options.emplace_back(option + "='" + value + "'");
4641  }
4642  }
4643 
4644  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
4645  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
4646  }
4647  if (!foreign_table && (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE)) {
4648  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
4649  }
4650  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
4651  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
4652  }
4653  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
4654  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
4655  }
4656  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
4657  td->maxRollbackEpochs != -1) {
4658  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
4660  }
4661  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
4662  with_options.push_back(td->hasDeletedCol ? "VACUUM='DELAYED'" : "VACUUM='IMMEDIATE'");
4663  }
4664  if (!foreign_table && !td->partitions.empty()) {
4665  with_options.push_back("PARTITIONS='" + td->partitions + "'");
4666  }