OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MigrationMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <exception>
21 #include <filesystem>
22 #include <sstream>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "Logger/Logger.h"
28 #include "QueryEngine/Execute.h"
30 #include "Shared/SysDefinitions.h"
31 #include "Shared/sqltypes.h"
32 
33 #include "MapDRelease.h"
34 
35 extern bool g_multi_instance;
36 
37 namespace migrations {
38 
39 void MigrationMgr::takeMigrationLock(const std::string& base_path) {
40 // TODO: support lock on Windows
41 #ifndef _WIN32
42  // Only used for --multi-instance clusters.
43  if (!g_multi_instance) {
44  migration_enabled_ = true;
45  return;
46  }
47 
48  // If we already have the migration lock then do nothing.
49  if (migration_mutex_) {
50  return;
51  }
52 
53  // Initialize the migration mutex. Will be locked until process exit.
54  migration_mutex_ = std::make_unique<heavyai::DistributedSharedMutex>(
55  std::filesystem::path(base_path) / shared::kLockfilesDirectoryName /
56  "migration.lockfile");
57 
58  // Take an exclusive lock if we can. If we get the exclusive lock, then later it will be
59  // relaxed to a shared lock, after we run migrations.
61  if (!g_multi_instance && !migration_enabled_) {
62  throw std::runtime_error(
63  "another HeavyDB server instance is already using data directory: " + base_path);
64  }
65 
66  // If we didn't get the exclusive lock, we'll wait for a shared lock instead, and we
67  // won't run migrations.
68  if (!migration_enabled_) {
69  migration_mutex_->lock_shared();
70  }
71 #else
72  migration_enabled_ = true;
73 #endif // _WIN32
74 }
75 
77 // TODO: support lock on Windows
78 #ifndef _WIN32
79  // Only used for --multi-instance clusters.
80  if (!g_multi_instance) {
81  return;
82  }
83 
84  // If we ran migrations, now relax the exclusive lock to a shared lock.
86  migration_mutex_->convert_lock_shared();
87  }
88 #endif // _WIN32
89 }
90 
92  const Catalog_Namespace::TableDescriptorMapById& table_descriptors_by_id,
93  const int database_id,
95  SqliteConnector& sqlite) {
96  std::vector<int> tables_migrated = {};
97  std::unordered_map<int, std::vector<std::string>> tables_to_migrate;
98  sqlite.query("BEGIN TRANSACTION");
99  try {
100  sqlite.query(
101  "select name from sqlite_master WHERE type='table' AND "
102  "name='mapd_version_history'");
103  if (sqlite.getNumRows() == 0) {
104  sqlite.query(
105  "CREATE TABLE mapd_version_history(version integer, migration_history text "
106  "unique)");
107  sqlite.query(
108  "CREATE TABLE mapd_date_in_days_column_migration_tmp(table_id integer primary "
109  "key)");
110  } else {
111  sqlite.query(
112  "select * from mapd_version_history where migration_history = "
113  "'date_in_days_column'");
114  if (sqlite.getNumRows() != 0) {
115  // no need for further execution
116  sqlite.query("END TRANSACTION");
117  return;
118  }
119  LOG(INFO) << "Checking for date columns requiring metadata migration.";
120  sqlite.query(
121  "select name from sqlite_master where type='table' AND "
122  "name='mapd_date_in_days_column_migration_tmp'");
123  if (sqlite.getNumRows() != 0) {
124  sqlite.query("select table_id from mapd_date_in_days_column_migration_tmp");
125  if (sqlite.getNumRows() != 0) {
126  for (size_t i = 0; i < sqlite.getNumRows(); i++) {
127  tables_migrated.push_back(sqlite.getData<int>(i, 0));
128  }
129  }
130  } else {
131  sqlite.query(
132  "CREATE TABLE mapd_date_in_days_column_migration_tmp(table_id integer "
133  "primary key)");
134  }
135  }
136  sqlite.query_with_text_params(
137  "SELECT tables.tableid, tables.name, columns.name FROM mapd_tables tables, "
138  "mapd_columns columns where tables.tableid = columns.tableid AND "
139  "columns.coltype = ?1 AND columns.compression = ?2",
140  std::vector<std::string>{
141  std::to_string(static_cast<int>(SQLTypes::kDATE)),
143  if (sqlite.getNumRows() != 0) {
144  for (size_t i = 0; i < sqlite.getNumRows(); i++) {
145  tables_to_migrate[sqlite.getData<int>(i, 0)] = {
146  sqlite.getData<std::string>(i, 1), sqlite.getData<std::string>(i, 2)};
147  }
148  }
149  } catch (const std::exception& e) {
150  LOG(ERROR) << "Failed to complete migration on date in days column metadata: "
151  << e.what();
152  sqlite.query("ROLLBACK");
153  throw;
154  }
155  sqlite.query("END TRANSACTION");
156 
157  for (auto& id_names : tables_to_migrate) {
158  if (std::find(tables_migrated.begin(), tables_migrated.end(), id_names.first) ==
159  tables_migrated.end()) {
160  sqlite.query("BEGIN TRANSACTION");
161  try {
162  LOG(INFO) << "Table: " << id_names.second[0]
163  << " may suffer from issues with DATE column: " << id_names.second[1]
164  << ". Running an OPTIMIZE command to solve any issues with metadata.";
165 
166  // TODO(adb): Could have the TableOptimizer get the Executor and avoid including
167  // Execute.h
169  auto table_desc_itr = table_descriptors_by_id.find(id_names.first);
170  if (table_desc_itr == table_descriptors_by_id.end()) {
171  throw std::runtime_error("Table descriptor does not exist for table " +
172  id_names.second[0] + " does not exist.");
173  }
174  auto td = table_desc_itr->second;
175  TableOptimizer optimizer(td, executor.get(), *cat);
176  optimizer.recomputeMetadata();
177 
178  sqlite.query_with_text_params(
179  "INSERT INTO mapd_date_in_days_column_migration_tmp VALUES(?)",
180  std::vector<std::string>{std::to_string(id_names.first)});
181  } catch (const std::exception& e) {
182  LOG(ERROR) << "Failed to complete metadata migration on date in days column: "
183  << e.what();
184  sqlite.query("ROLLBACK");
185  throw;
186  }
187  sqlite.query("COMMIT");
188  }
189  }
190 
191  sqlite.query("BEGIN TRANSACTION");
192  try {
193  sqlite.query("DROP TABLE mapd_date_in_days_column_migration_tmp");
194  sqlite.query_with_text_params(
195  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
196  std::vector<std::string>{std::to_string(MAPD_VERSION), "date_in_days_column"});
197  } catch (const std::exception& e) {
198  LOG(ERROR) << "Failed to complete migraion on date in days column: " << e.what();
199  sqlite.query("ROLLBACK");
200  throw;
201  }
202  sqlite.query("END TRANSACTION");
203  LOG(INFO) << "Successfully migrated all date in days column metadata.";
204 }
205 
207  const Catalog_Namespace::TableDescriptorMapById& table_descriptors_by_id,
209  // for this catalog...
210  CHECK(cat);
211  auto& catalog = *cat;
212 
213  // skip info schema catalog
214  if (catalog.isInfoSchemaDb()) {
215  return true;
216  }
217 
218  // report catalog
219  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Processing catalog '"
220  << catalog.name() << "'";
221 
222  // HeavyConnect cache
223  auto* heavyconnect_cache =
224  catalog.getDataMgr().getPersistentStorageMgr()->getDiskCache();
225 
226  bool all_tables_migrated = true;
227 
228  // all tables...
229  for (auto td_itr = table_descriptors_by_id.begin();
230  td_itr != table_descriptors_by_id.end();
231  td_itr++) {
232  // the table...
233  auto const* td = td_itr->second;
234  CHECK(td);
235 
236  // skip views and temps
237  if (td->isView) {
238  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Skipping view '"
239  << td->tableName << "'";
240  continue;
241  }
242  if (table_is_temporary(td)) {
243  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Skipping temporary table '"
244  << td->tableName << "'";
245  continue;
246  }
247 
248  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Examining table '"
249  << td->tableName << "'";
250 
251  // find render group columns
252  auto logical_cds =
253  catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
254  // prepare to capture names
255  std::vector<std::string> columns_to_drop;
256  // iterate all columns
257  for (auto cd_itr = logical_cds.begin(); cd_itr != logical_cds.end(); cd_itr++) {
258  auto const* cd = *cd_itr;
259  CHECK(cd);
260  // poly or multipoly column?
261  auto const cd_type = cd->columnType.get_type();
262  if (cd_type == kPOLYGON || cd_type == kMULTIPOLYGON) {
263  // next logical column
264  auto const next_itr = std::next(cd_itr);
265  if (next_itr == logical_cds.end()) {
266  // no next column, perhaps table already migrated
267  break;
268  }
269  // the next column
270  auto const* next_cd = *next_itr;
271  CHECK(next_cd);
272  // expected name?
273  auto const next_name = next_cd->columnName;
274  auto const expected_name = cd->columnName + "_render_group";
275  if (next_name == expected_name) {
276  // expected type?
277  auto const next_type = next_cd->columnType.get_type();
278  if (next_type == kINT) {
279  // expected ID increment
280  auto const next_id = next_cd->columnId;
281  auto const expected_next_id =
282  cd->columnId + cd->columnType.get_physical_cols() + 1;
283  if (next_id == expected_next_id) {
284  // report
285  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Found render "
286  "group column '"
287  << next_name << "'";
288  // capture name
289  columns_to_drop.emplace_back(next_name);
290  // restart with the column after the one we identified
291  cd_itr++;
292  } else {
293  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Expected render "
294  "group column '"
295  << next_name << "' has wrong ID (" << next_id << "/"
296  << expected_next_id << "), skipping...";
297  }
298  } else {
299  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Expected render "
300  "group column '"
301  << next_name << "' has wrong type (" << to_string(next_type)
302  << "), skipping...";
303  }
304  }
305  }
306  }
307 
308  // any to drop?
309  if (columns_to_drop.size() == 0) {
310  LOG(INFO)
311  << "MigrationMgr: dropRenderGroupColumns: No render group columns found";
312  continue;
313  }
314 
315  // drop the columns
316  catalog.getSqliteConnector().query("BEGIN TRANSACTION");
317  try {
318  std::vector<int> column_ids;
319  for (auto const& column : columns_to_drop) {
320  auto const* cd = catalog.getMetadataForColumn(td->tableId, column);
321  CHECK(cd);
322  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Removing render "
323  "group column '"
324  << cd->columnName << "'";
325  catalog.dropColumn(*td, *cd);
326  column_ids.push_back(cd->columnId);
327  }
328  if (!td->isForeignTable()) {
329  for (auto const* physical_td : catalog.getPhysicalTablesDescriptors(td)) {
330  CHECK(physical_td);
331  // getMetadataForTable() may not have been called on this table, so
332  // the table may not yet have a fragmenter (which that function lazy
333  // creates by default) so call it manually here to force creation of
334  // the fragmenter so we can use it to actually drop the columns
335  if (physical_td->fragmenter == nullptr) {
336  catalog.getMetadataForTable(physical_td->tableId, true);
337  CHECK(physical_td->fragmenter);
338  }
339  physical_td->fragmenter->dropColumns(column_ids);
340  }
341  }
342  catalog.rollLegacy(true);
343  if (!td->isForeignTable()) {
344  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
345  catalog.resetTableEpochFloor(td->tableId);
346  catalog.checkpoint(td->tableId);
347  }
348  }
349  catalog.getSqliteConnector().query("END TRANSACTION");
350  } catch (std::exception& e) {
351  if (!td->isForeignTable()) {
352  catalog.setForReload(td->tableId);
353  }
354  catalog.rollLegacy(false);
355  catalog.getSqliteConnector().query("ROLLBACK TRANSACTION");
356  LOG(ERROR) << "MigrationMgr: dropRenderGroupColumns: Failed to drop render group "
357  "columns for Table '"
358  << td->tableName << "' in Database '" << catalog.name() << "' ("
359  << e.what() << ")";
360 
361  // trigger overall failure
362  all_tables_migrated = false;
363 
364  // don't do anything else for this table
365  continue;
366  }
367 
368  // flush any HeavyConnect foreign table cache
369  if (heavyconnect_cache && td->isForeignTable()) {
370  for (auto const* physical_td : catalog.getPhysicalTablesDescriptors(td)) {
371  CHECK(physical_td);
372  LOG(INFO) << "MigrationMgr: dropRenderGroupColumns: Flushing HeavyConnect "
373  "cache for table '"
374  << physical_td->tableName << "'";
375  heavyconnect_cache->clearForTablePrefix(
376  {catalog.getCurrentDB().dbId, physical_td->tableId});
377  }
378  }
379  }
380 
381  return all_tables_migrated;
382 }
383 
384 namespace {
385 bool rename_and_symlink_path(const std::filesystem::path& old_path,
386  const std::filesystem::path& new_path) {
387  bool file_updated{false};
388  if (std::filesystem::exists(old_path)) {
389  // Skip if we have already created a symlink for the old path.
390  if (std::filesystem::is_symlink(old_path)) {
391  if (std::filesystem::read_symlink(old_path) != new_path.filename()) {
392  std::stringstream ss;
393  ss << "Rebrand migration: Encountered an unexpected symlink at path: " << old_path
394  << ". Symlink does not reference file: " << new_path.filename();
395  throw std::runtime_error(ss.str());
396  }
397  if (!std::filesystem::exists(new_path)) {
398  std::stringstream ss;
399  ss << "Rebrand migration: Encountered symlink at legacy path: " << old_path
400  << " but no corresponding file at new path: " << new_path;
401  throw std::runtime_error(ss.str());
402  }
403  } else {
404  if (std::filesystem::exists(new_path)) {
405  std::stringstream ss;
406  ss << "Rebrand migration: Encountered existing non-symlink files at the legacy "
407  "path: "
408  << old_path << " and new path: " << new_path;
409  throw std::runtime_error(ss.str());
410  }
411  std::filesystem::rename(old_path, new_path);
412  std::cout << "Rebrand migration: Renamed " << old_path << " to " << new_path
413  << std::endl;
414  file_updated = true;
415  }
416  }
417 
418  if (std::filesystem::exists(old_path)) {
419  if (!std::filesystem::is_symlink(old_path)) {
420  std::stringstream ss;
421  ss << "Rebrand migration: An unexpected error occurred. A symlink should have been "
422  "created at "
423  << old_path;
424  throw std::runtime_error(ss.str());
425  }
426  if (std::filesystem::read_symlink(old_path) != new_path.filename()) {
427  std::stringstream ss;
428  ss << "Rebrand migration: Encountered an unexpected symlink at path: " << old_path
429  << ". Symlink does not reference file: " << new_path.filename();
430  throw std::runtime_error(ss.str());
431  }
432  } else if (std::filesystem::exists(new_path)) {
433  std::filesystem::create_symlink(new_path.filename(), old_path);
434  std::cout << "Rebrand migration: Added symlink from " << old_path << " to "
435  << new_path.filename() << std::endl;
436  file_updated = true;
437  }
438  return file_updated;
439 }
440 
441 bool rename_and_symlink_file(const std::filesystem::path& base_path,
442  const std::string& dir_name,
443  const std::string& old_file_name,
444  const std::string& new_file_name) {
445  auto old_path = std::filesystem::canonical(base_path);
446  auto new_path = std::filesystem::canonical(base_path);
447  if (!dir_name.empty()) {
448  old_path /= dir_name;
449  new_path /= dir_name;
450  }
451  if (old_file_name.empty()) {
452  throw std::runtime_error(
453  "Unexpected error in rename_and_symlink_file: old_file_name is empty");
454  }
455  old_path /= old_file_name;
456 
457  if (new_file_name.empty()) {
458  throw std::runtime_error(
459  "Unexpected error in rename_and_symlink_file: new_file_name is empty");
460  }
461  new_path /= new_file_name;
462 
463  return rename_and_symlink_path(old_path, new_path);
464 }
465 } // namespace
466 
467 void MigrationMgr::executeRebrandMigration(const std::string& base_path) {
468  bool migration_occurred{false};
469 
470  // clang-format off
471  const std::map<std::string, std::string> old_to_new_dir_names {
472  {"mapd_catalogs", shared::kCatalogDirectoryName},
473  {"mapd_data", shared::kDataDirectoryName},
474  {"mapd_log", shared::kDefaultLogDirName},
475  {"mapd_export", shared::kDefaultExportDirName},
476  {"mapd_import", shared::kDefaultImportDirName},
477  {"omnisci_key_store", shared::kDefaultKeyStoreDirName}
478  };
479  // clang-format on
480 
481  const auto storage_base_path = std::filesystem::canonical(base_path);
482  // Rename legacy directories (if they exist), and create symlinks from legacy directory
483  // names to the new names (if they don't already exist).
484  for (const auto& [old_dir_name, new_dir_name] : old_to_new_dir_names) {
485  auto old_path = storage_base_path / old_dir_name;
486  auto new_path = storage_base_path / new_dir_name;
487  if (rename_and_symlink_path(old_path, new_path)) {
488  migration_occurred = true;
489  }
490  }
491 
492  // Rename legacy files and create symlinks to them.
493  const auto license_updated = rename_and_symlink_file(
494  storage_base_path, "", "omnisci.license", shared::kDefaultLicenseFileName);
495  const auto key_updated = rename_and_symlink_file(storage_base_path,
497  "omnisci.pem",
499  const auto sys_catalog_updated = rename_and_symlink_file(storage_base_path,
501  "omnisci_system_catalog",
503  if (license_updated || key_updated || sys_catalog_updated) {
504  migration_occurred = true;
505  }
506 
507  // Delete the disk cache directory and legacy files that will no longer be used.
508  const std::array<std::filesystem::path, 9> files_to_delete{
509  storage_base_path / "omnisci_disk_cache",
510  storage_base_path / "omnisci_server_pid.lck",
511  storage_base_path / "mapd_server_pid.lck",
512  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.FATAL",
513  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.ERROR",
514  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.WARNING",
515  storage_base_path / shared::kDefaultLogDirName / "omnisci_server.INFO",
516  storage_base_path / shared::kDefaultLogDirName / "omnisci_web_server.ALL",
517  storage_base_path / shared::kDefaultLogDirName / "omnisci_web_server.ACCESS"};
518 
519  for (const auto& file_path : files_to_delete) {
520  if (std::filesystem::exists(file_path)) {
521  std::filesystem::remove_all(file_path);
522  std::cout << "Rebrand migration: Deleted file " << file_path << std::endl;
523  migration_occurred = true;
524  }
525  }
526  if (migration_occurred) {
527  std::cout << "Rebrand migration completed" << std::endl;
528  }
529 }
530 } // namespace migrations
const std::string kDataDirectoryName
bool g_multi_instance
Definition: heavyai_locks.h:22
std::string cat(Ts &&...args)
T getData(const int row, const int col)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
static bool dropRenderGroupColumns(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, Catalog_Namespace::Catalog *cat)
virtual void query_with_text_params(std::string const &query_only)
#define LOG(tag)
Definition: Logger.h:285
static void relaxMigrationLock()
const std::string kDefaultLogDirName
const std::string kSystemCatalogName
virtual void query(const std::string &queryString)
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
Constants for Builtin SQL Types supported by HEAVY.AI.
const std::string kDefaultExportDirName
std::string to_string(char const *&&v)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:509
bool rename_and_symlink_path(const std::filesystem::path &old_path, const std::filesystem::path &new_path)
const std::string kDefaultImportDirName
std::map< int, TableDescriptor * > TableDescriptorMapById
Definition: Types.h:35
static bool migration_enabled_
Definition: MigrationMgr.h:58
static const int32_t MAPD_VERSION
Definition: release.h:32
const std::string kDefaultKeyFileName
const std::string kDefaultKeyStoreDirName
Definition: sqltypes.h:80
static void executeRebrandMigration(const std::string &base_path)
static void takeMigrationLock(const std::string &base_path)
bool table_is_temporary(const TableDescriptor *const td)
bool rename_and_symlink_file(const std::filesystem::path &base_path, const std::string &dir_name, const std::string &old_file_name, const std::string &new_file_name)
const std::string kCatalogDirectoryName
const std::string kDefaultLicenseFileName
#define CHECK(condition)
Definition: Logger.h:291
static void migrateDateInDaysMetadata(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, const int database_id, Catalog_Namespace::Catalog *cat, SqliteConnector &sqlite)
static std::unique_ptr< heavyai::DistributedSharedMutex > migration_mutex_
Definition: MigrationMgr.h:57
const std::string kLockfilesDirectoryName
Definition: sqltypes.h:72
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
virtual size_t getNumRows() const
void recomputeMetadata() const
Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk met...