OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableArchiver.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/filesystem.hpp>
21 #include <boost/process.hpp>
22 #include <boost/range/combine.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 #include <boost/version.hpp>
26 #include <cerrno>
27 #include <cstdio>
28 #include <cstring>
29 #include <exception>
30 #include <list>
31 #include <memory>
32 #include <regex>
33 #include <set>
34 #include <sstream>
35 #include <system_error>
36 
37 #include "Archive/S3Archive.h"
41 #include "LockMgr/LockMgr.h"
42 #include "Logger/Logger.h"
43 #include "Parser/ParserNode.h"
44 #include "Shared/File.h"
45 #include "Shared/StringTransform.h"
46 #include "Shared/SysDefinitions.h"
48 #include "Shared/file_path_util.h"
49 #include "Shared/measure.h"
50 #include "Shared/scope.h"
51 #include "Shared/thread_count.h"
52 
53 extern bool g_cluster;
54 extern std::string g_base_path;
56 
57 constexpr static int kDumpVersion = 1;
59 
60 constexpr static char const* table_schema_filename = "_table.sql";
61 constexpr static char const* table_oldinfo_filename = "_table.oldinfo";
62 constexpr static char const* table_epoch_filename = "_table.epoch";
63 constexpr static char const* table_dumpversion_filename = "_table.dumpversion";
64 
65 #if BOOST_VERSION < 107300
66 namespace std {
67 
68 template <typename T, typename U>
69 struct tuple_size<boost::tuples::cons<T, U>>
70  : boost::tuples::length<boost::tuples::cons<T, U>> {};
71 template <size_t I, typename T, typename U>
72 struct tuple_element<I, boost::tuples::cons<T, U>>
73  : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
74 
75 } // namespace std
76 #endif
77 
78 namespace {
79 
80 inline auto simple_file_closer = [](FILE* f) { std::fclose(f); };
81 
82 inline std::string abs_path(const File_Namespace::GlobalFileMgr* global_file_mgr) {
83  return boost::filesystem::canonical(global_file_mgr->getBasePath()).string();
84 }
85 
86 inline std::string run(const std::string& cmd,
87  const std::string& chdir = "",
88  const bool log_failure = true) {
89  VLOG(3) << "running cmd: " << cmd;
90  int rcode;
91  std::error_code ec;
92  std::string output, errors;
93  const auto time_ms = measure<>::execution([&]() {
94  using namespace boost::process;
95  ipstream stdout, stderr;
96  if (!chdir.empty()) {
97  rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
98  } else {
99  rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
100  }
101  std::ostringstream ss_output, ss_errors;
102  stdout >> ss_output.rdbuf();
103  stderr >> ss_errors.rdbuf();
104  output = ss_output.str();
105  errors = ss_errors.str();
106  });
107  if (rcode || ec) {
108  if (log_failure) {
109  LOG(ERROR) << "failed cmd: " << cmd;
110  LOG(ERROR) << "exit code: " << rcode;
111  LOG(ERROR) << "error code: " << ec.value() << " - " << ec.message();
112  LOG(ERROR) << "stdout: " << output;
113  LOG(ERROR) << "stderr: " << errors;
114  }
115 #if defined(__APPLE__)
116  // osx bsdtar options "--use-compress-program" and "--fast-read" together
117  // run into pipe write error after tar extracts the first occurrence of a
118  // file and closes the read end while the decompression program still writes
119  // to the pipe. bsdtar doesn't handle this situation well like gnu tar does.
120  if (1 == rcode && cmd.find("--fast-read") &&
121  (errors.find("cannot write decoded block") != std::string::npos ||
122  errors.find("Broken pipe") != std::string::npos)) {
123  // ignore this error, or lose speed advantage of "--fast-read" on osx.
124  LOG(ERROR) << "tar error ignored on osx for --fast-read";
125  } else
126 #endif
127  // circumvent tar warning on reading file that is "changed as we read it".
128  // this warning results from reading a table file under concurrent inserts
129  if (1 == rcode && errors.find("changed as we read") != std::string::npos) {
130  LOG(ERROR) << "tar error ignored under concurrent inserts";
131  } else {
132  int error_code;
133  std::string error_message;
134  if (ec) {
135  error_code = ec.value();
136  error_message = ec.message();
137  } else {
138  error_code = rcode;
139  // Show a more concise message for permission errors instead of the default
140  // verbose message. Error logs will still contain all details.
141  if (to_lower(errors).find("permission denied") != std::string::npos) {
142  error_message = "Insufficient file read/write permission.";
143  } else {
144  error_message = errors;
145  }
146  }
147  throw std::runtime_error(
148  "An error occurred while executing an internal command. Error code: " +
149  std::to_string(error_code) + ", message: " + error_message);
150  }
151  } else {
152  VLOG(3) << "finished cmd: " << cmd;
153  VLOG(3) << "time: " << time_ms << " ms";
154  VLOG(3) << "stdout: " << output;
155  }
156  return output;
157 }
158 
159 inline std::string simple_file_cat(const std::string& archive_path,
160  const std::string& file_name,
161  const std::string& compression,
162  const bool log_failure = true) {
165 #if defined(__APPLE__)
166  constexpr static auto opt_occurrence = "--fast-read";
167 #else
168  constexpr static auto opt_occurrence = "--occurrence=1";
169 #endif
170  boost::filesystem::path temp_dir =
171  boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
172  boost::filesystem::create_directories(temp_dir);
173  run("tar " + compression + " -xvf " + get_quoted_string(archive_path) + " " +
174  opt_occurrence + " " + file_name,
175  temp_dir.string(),
176  log_failure);
177  const auto output = run("cat " + (temp_dir / file_name).string());
178  boost::filesystem::remove_all(temp_dir);
179  return output;
180 }
181 
182 inline std::string get_table_schema(const std::string& archive_path,
183  const std::string& table,
184  const std::string& compression) {
185  const auto schema_str =
186  simple_file_cat(archive_path, table_schema_filename, compression);
187  std::regex regex("@T");
188  return std::regex_replace(schema_str, regex, table);
189 }
190 
191 // If a table was altered there may be a mapping from old column ids to new ones these
192 // values need to be replaced in the page headers.
194  const boost::filesystem::path& path,
195  const std::unordered_map<int, int>& column_ids_map,
196  const int32_t table_epoch,
197  const bool drop_not_update) {
198  const std::string file_path = path.string();
199  const std::string file_name = path.filename().string();
200  std::vector<std::string> tokens;
201  boost::split(tokens, file_name, boost::is_any_of("."));
202 
203  // ref. FileMgr::init for hint of data file name layout
204  if (tokens.size() <= 2 || !(DATA_FILE_EXT == "." + tokens[2] || tokens[2] == "mapd")) {
205  // We are only interested in files in the form <id>.<page_size>.<DATA_FILE_EXT>
206  return;
207  }
208 
209  const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
210  const auto file_size = boost::filesystem::file_size(file_path);
211  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
212  std::fopen(file_path.c_str(), "r+"), simple_file_closer);
213  if (!fp) {
214  throw std::runtime_error("Failed to open " + file_path +
215  " for update: " + std::strerror(errno));
216  }
217  // TODO(Misiu): Rather than reference an exernal layout we should de-duplicate this
218  // page-reading code in a single location. This will also reduce the need for comments
219  // below.
220  // ref. FileInfo::openExistingFile for hint of chunk header layout
221  for (size_t page = 0; page < file_size / page_size; ++page) {
222  int32_t header_info[8];
223  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
224  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
225  file_path + " for read: " + std::strerror(errno));
226  }
227  if (1 != fread(header_info, sizeof header_info, 1, fp.get())) {
228  throw std::runtime_error("Failed to read " + file_path + ": " +
229  std::strerror(errno));
230  }
231  if (const auto header_size = header_info[0]; header_size > 0) {
232  // header_info[1] is the page's db_id; but can also be used as an "is deleted"
233  // indicator if negative.
234  auto& contingent = header_info[1];
235  // header_info[2] is the page's table_id; but can also used to store the page's
236  // epoch since the FileMgr stores table_id information separately.
237  auto& epoch = header_info[2];
238  auto& col_id = header_info[3];
240  table_epoch, epoch, contingent)) {
241  continue;
242  }
243  auto column_map_it = column_ids_map.find(col_id);
244  bool rewrite_header = false;
245  if (drop_not_update) {
246  // if the header contains a column ID that is a key of the map
247  // erase the entire header so that column is effectively dropped
248  // the value of the map is ignored, thus allowing us to use the
249  // same function for both operations
250  if (column_map_it != column_ids_map.end()) {
251  // clear the entire header
252  std::memset(header_info, 0, sizeof(header_info));
253  rewrite_header = true;
254  }
255  } else {
256  if (column_map_it == column_ids_map.end()) {
257  throw std::runtime_error("Page " + std::to_string(page) + " in " + file_path +
258  " has unexpected Column ID " + std::to_string(col_id) +
259  ". Dump may be corrupt.");
260  }
261  // If a header contains a column id that is remapped to new location
262  // then write that change to the file.
263  if (const auto dest_col_id = column_map_it->second; col_id != dest_col_id) {
264  col_id = dest_col_id;
265  rewrite_header = true;
266  }
267  }
268  if (rewrite_header) {
269  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
270  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
271  file_path + " for write: " + std::strerror(errno));
272  }
273  if (1 != fwrite(header_info, sizeof header_info, 1, fp.get())) {
274  throw std::runtime_error("Failed to write " + file_path + ": " +
275  std::strerror(errno));
276  }
277  }
278  }
279  }
280 }
281 
282 // Rewrite column ids in chunk keys in a table's data files under a temp_data_dir,
283 // including files of all shards of the table. Can be slow for big files but should
284 // be scale faster than refragmentizing. Table altering should be rare for olap.
285 // Also used to erase page headers for columns that must be dropped completely.
287  const int32_t table_epoch,
288  const std::string& temp_data_dir,
289  const std::unordered_map<int, int>& column_ids_map,
290  const bool drop_not_update) {
291  boost::filesystem::path base_path(temp_data_dir);
292  boost::filesystem::recursive_directory_iterator end_it;
294  for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
295  ++fit) {
296  if (!boost::filesystem::is_symlink(fit->path()) &&
297  boost::filesystem::is_regular_file(fit->status())) {
299  fit->path(),
300  column_ids_map,
301  table_epoch,
302  drop_not_update);
303  thread_controller.checkThreadsStatus();
304  }
305  }
306  thread_controller.finish();
307 }
308 
309 void delete_old_symlinks(const std::string& table_data_dir) {
310  std::vector<boost::filesystem::path> symlinks;
311  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
312  it++) {
313  if (boost::filesystem::is_symlink(it->path())) {
314  symlinks.emplace_back(it->path());
315  }
316  }
317  for (const auto& symlink : symlinks) {
318  boost::filesystem::remove_all(symlink);
319  }
320 }
321 
322 void add_data_file_symlinks(const std::string& table_data_dir) {
323  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
324  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
325  it++) {
326  const auto path = boost::filesystem::canonical(it->path());
327  if (path.extension().string() == DATA_FILE_EXT) {
328  auto old_path = path;
329  old_path.replace_extension(File_Namespace::kLegacyDataFileExtension);
330  // Add a symlink to data file, if one does not exist.
331  if (!boost::filesystem::exists(old_path)) {
332  old_to_new_paths[old_path] = path;
333  }
334  }
335  }
336  for (const auto& [old_path, new_path] : old_to_new_paths) {
337  boost::filesystem::create_symlink(new_path.filename(), old_path);
338  }
339 }
340 
342  const std::string& temp_data_dir,
343  const std::vector<std::string>& target_paths,
344  const std::string& name_prefix) {
345  boost::filesystem::path base_path(temp_data_dir);
346  boost::filesystem::directory_iterator end_it;
347  int target_path_index = 0;
348  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
349  if (!boost::filesystem::is_regular_file(fit->status())) {
350  const std::string file_path = fit->path().string();
351  const std::string file_name = fit->path().filename().string();
352  if (boost::istarts_with(file_name, name_prefix)) {
353  const std::string target_path =
354  abs_path(global_file_mgr) + "/" + target_paths[target_path_index++];
355  if (std::rename(file_path.c_str(), target_path.c_str())) {
356  throw std::runtime_error("Failed to rename file " + file_path + " to " +
357  target_path + ": " + std::strerror(errno));
358  }
359  // Delete any old/invalid symlinks contained in table dump.
360  delete_old_symlinks(target_path);
362  // For post-rebrand table dumps, symlinks need to be added here, since file mgr
363  // migration would already have been executed for the dumped table.
364  add_data_file_symlinks(target_path);
365  }
366  }
367  }
368 }
369 
370 std::unordered_map<int, int> find_render_group_columns(
371  const std::list<ColumnDescriptor>& src_columns,
372  std::vector<std::string>& src_oldinfo_strs,
373  const std::string& archive_path) {
374  // scan for poly or mpoly columns and collect their names
375  std::vector<std::string> poly_column_names;
376  for (auto const& src_column : src_columns) {
377  auto const sqltype = src_column.columnType.get_type();
378  if (sqltype == kPOLYGON || sqltype == kMULTIPOLYGON) {
379  poly_column_names.push_back(src_column.columnName);
380  }
381  }
382 
383  // remove any matching render group columns from the source list
384  // and capture their IDs in the keys of a map (value is ignored)
385  std::unordered_map<int, int> column_ids_to_drop;
386  auto last_itr = std::remove_if(
387  src_oldinfo_strs.begin(),
388  src_oldinfo_strs.end(),
389  [&](const std::string& v) -> bool {
390  // tokenize
391  std::vector<std::string> tokens;
393  tokens, v, boost::is_any_of(":"), boost::token_compress_on);
394  // extract name and ID
395  if (tokens.size() < 2) {
396  throw std::runtime_error(
397  "Dump " + archive_path +
398  " has invalid oldinfo file contents. Dump may be corrupt.");
399  }
400  auto const& column_name = tokens[0];
401  auto const column_id = std::stoi(tokens[1]);
402  for (auto const& poly_column_name : poly_column_names) {
403  // is it a render group column?
404  auto const render_group_column_name = poly_column_name + "_render_group";
405  if (column_name == render_group_column_name) {
406  LOG(INFO) << "RESTORE TABLE dropping render group column '"
407  << render_group_column_name << "' from dump " << archive_path;
408  // add to "set"
409  column_ids_to_drop[column_id] = -1;
410  return true;
411  }
412  }
413  return false;
414  });
415  src_oldinfo_strs.erase(last_itr, src_oldinfo_strs.end());
416 
417  return column_ids_to_drop;
418 }
419 
421  const std::unordered_map<int, int>& render_group_column_ids,
422  const std::string& archive_path,
423  const std::string& temp_data_dir,
424  const std::string& compression) {
425  // rewrite page files to drop the columns with IDs that are the keys of the map
426  if (render_group_column_ids.size()) {
427  const auto epoch = boost::lexical_cast<int32_t>(
428  simple_file_cat(archive_path, table_epoch_filename, compression));
429  const auto time_ms = measure<>::execution([&]() {
431  epoch, temp_data_dir, render_group_column_ids, true /* drop */);
432  });
433  VLOG(3) << "drop render group columns: " << time_ms << " ms";
434  }
435 }
436 
437 } // namespace
438 
440  const std::string& archive_path,
441  const std::string& compression) {
442  if (td->is_system_table) {
443  throw std::runtime_error("Dumping a system table is not supported.");
444  }
445  if (td->isForeignTable()) {
446  throw std::runtime_error("Dumping a foreign table is not supported.");
447  }
450  if (g_cluster) {
451  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
452  }
453  if (boost::filesystem::exists(archive_path)) {
454  throw std::runtime_error("Archive " + archive_path + " already exists.");
455  }
457  throw std::runtime_error("Dumping view or temporary table is not supported.");
458  }
459  // create a unique uuid for this table dump
460  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
461 
462  // collect paths of files to archive
463  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
464  std::vector<std::string> file_paths;
465  auto file_writer = [&file_paths, uuid](const std::string& file_name,
466  const std::string& file_type,
467  const std::string& file_data) {
468  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
469  std::fopen(file_name.c_str(), "w"), simple_file_closer);
470  if (!fp) {
471  throw std::runtime_error("Failed to create " + file_type + " file '" + file_name +
472  "': " + std::strerror(errno));
473  }
474  if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
475  throw std::runtime_error("Failed to write " + file_type + " file '" + file_name +
476  "': " + std::strerror(errno));
477  }
478  file_paths.push_back(uuid / std::filesystem::path(file_name).filename());
479  };
480 
481  const auto file_mgr_dir = std::filesystem::path(abs_path(global_file_mgr));
482  const auto uuid_dir = file_mgr_dir / uuid;
483 
484  if (!std::filesystem::create_directory(uuid_dir)) {
485  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
486  "' while dumping table.");
487  }
488 
489  ScopeGuard cleanup_guard = [&] {
490  if (std::filesystem::exists(uuid_dir)) {
491  std::filesystem::remove_all(uuid_dir);
492  }
493  };
494 
495  const auto table_name = td->tableName;
496  {
497  // - gen dumpversion file
498  const auto dumpversion_str = std::to_string(kDumpVersion);
499  file_writer(
500  uuid_dir / table_dumpversion_filename, "table dumpversion", dumpversion_str);
501  // - gen schema file
502  const auto schema_str = cat_->dumpSchema(td);
503  file_writer(uuid_dir / table_schema_filename, "table schema", schema_str);
504  // - gen column-old-info file
505  const auto cds = cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
506  std::vector<std::string> column_oldinfo;
507  std::transform(cds.begin(),
508  cds.end(),
509  std::back_inserter(column_oldinfo),
510  [&](const auto cd) -> std::string {
511  return cd->columnName + ":" + std::to_string(cd->columnId) + ":" +
513  });
514  const auto column_oldinfo_str = boost::algorithm::join(column_oldinfo, " ");
515  file_writer(uuid_dir / table_oldinfo_filename, "table old info", column_oldinfo_str);
516  // - gen table epoch
517  const auto epoch = cat_->getTableEpoch(cat_->getCurrentDB().dbId, td->tableId);
518  file_writer(uuid_dir / table_epoch_filename, "table epoch", std::to_string(epoch));
519  // - collect table data file paths ...
520  const auto data_file_dirs = cat_->getTableDataDirectories(td);
521  file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
522  // - collect table dict file paths ...
523  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
524  file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
525  // tar takes time. release cat lock to yield the cat to concurrent CREATE statements.
526  }
527  // run tar to archive the files ... this may take a while !!
528  run("tar " + compression + " --transform=s|" + uuid +
529  std::filesystem::path::preferred_separator + "|| -cvf " +
530  get_quoted_string(archive_path) + " " + boost::algorithm::join(file_paths, " "),
531  file_mgr_dir);
532 }
533 
534 // Restore data and dict files of a table from a tgz archive.
536  const TableDescriptor* td,
537  const std::string& archive_path,
538  const std::string& compression) {
541  if (g_cluster) {
542  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
543  }
544  if (!boost::filesystem::exists(archive_path)) {
545  throw std::runtime_error("Archive " + archive_path + " does not exist.");
546  }
548  throw std::runtime_error("Restoring view or temporary table is not supported.");
549  }
550  // Obtain table schema read lock to prevent modification of the schema during
551  // restoration
552  const auto table_read_lock =
554  // prevent concurrent inserts into table during restoration
555  const auto insert_data_lock =
557 
558  // untar takes time. no grab of cat lock to yield to concurrent CREATE stmts.
559  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
560 
561  // create a unique uuid for this table restore
562  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
563 
564  const auto uuid_dir = std::filesystem::path(abs_path(global_file_mgr)) / uuid;
565 
566  if (!std::filesystem::create_directory(uuid_dir)) {
567  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
568  "' while restoring table.");
569  }
570 
571  ScopeGuard cleanup_guard = [&] {
572  if (std::filesystem::exists(uuid_dir)) {
573  std::filesystem::remove_all(uuid_dir);
574  }
575  };
576 
577  // dirs where src files are untarred and dst files are backed up
578  constexpr static const auto temp_data_basename = "_data";
579  constexpr static const auto temp_back_basename = "_back";
580  const auto temp_data_dir = uuid_dir / temp_data_basename;
581  const auto temp_back_dir = uuid_dir / temp_back_basename;
582 
583  // extract & parse schema
584  const auto schema_str = get_table_schema(archive_path, td->tableName, compression);
585  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
586  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
587  CHECK(create_table_stmt);
588 
589  // verify compatibility between source and destination schemas
590  TableDescriptor src_td;
591  std::list<ColumnDescriptor> src_columns;
592  std::vector<Parser::SharedDictionaryDef> shared_dict_defs;
593  create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
594  // - sanity check table-level compatibility
595  if (src_td.hasDeletedCol != td->hasDeletedCol) {
596  // TODO: allow the case, in which src data enables vacuum while
597  // dst doesn't, by simply discarding src $deleted column data.
598  throw std::runtime_error("Incompatible table VACCUM option");
599  }
600  if (src_td.nShards != td->nShards) {
601  // TODO: allow different shard numbers if they have a "GCD",
602  // by splitting/merging src data files before drop into dst.
603  throw std::runtime_error("Unmatched number of table shards");
604  }
605  // - sanity check column-level compatibility (based on column names)
606  const auto dst_columns =
607  cat_->getAllColumnMetadataForTable(td->tableId, false, false, false);
608  if (dst_columns.size() != src_columns.size()) {
609  throw std::runtime_error("Unmatched number of table columns");
610  }
611  for (const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
612  if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
613  src_cd.columnType.get_compression_name() !=
614  dst_cd->columnType.get_compression_name()) {
615  throw std::runtime_error("Incompatible types on column " + src_cd.columnName);
616  }
617  }
618  // extract src table column ids (ALL columns incl. system/virtual/phy geo cols)
619  const auto all_src_oldinfo_str =
620  simple_file_cat(archive_path, table_oldinfo_filename, compression);
621  std::vector<std::string> src_oldinfo_strs;
622  boost::algorithm::split(src_oldinfo_strs,
623  all_src_oldinfo_str,
624  boost::is_any_of(" "),
625  boost::token_compress_on);
626 
627  // fetch dump version
628  int dump_version = -1;
629  try {
630  // attempt to read file, do not log if fail to read
631  auto const dump_version_str =
632  simple_file_cat(archive_path, table_dumpversion_filename, compression, false);
633  dump_version = std::stoi(dump_version_str);
634  } catch (std::runtime_error& e) {
635  // no dump version file found
636  dump_version = 0;
637  }
638  LOG(INFO) << "Dump Version: " << dump_version;
639 
640  // version-specific behavior
641  const bool do_drop_render_group_columns =
643 
644  // remove any render group columns from the source columns so that the list of
645  // source columns matches the already-created table, and the removed ones will
646  // not have an entry in column_ids_map, and hence will not have their data
647  // mapped later (effectively dropping them), and return their IDs for when
648  // they are actually dropped later
649  std::unordered_map<int, int> render_group_column_ids;
650  if (do_drop_render_group_columns) {
651  render_group_column_ids =
652  find_render_group_columns(src_columns, src_oldinfo_strs, archive_path);
653  }
654 
655  // compare with the destination columns
656  auto all_dst_columns =
657  cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
658  if (src_oldinfo_strs.size() != all_dst_columns.size()) {
659  throw std::runtime_error("Source table has a unmatched number of columns: " +
660  std::to_string(src_oldinfo_strs.size()) + " vs " +
661  std::to_string(all_dst_columns.size()));
662  }
663  // build a map of src column ids and dst column ids, just in case src table has been
664  // ALTERed before and chunk keys of src table needs to be adjusted accordingly.
665  // note: this map is used only for the case of migrating a table and not for restoring
666  // a table. When restoring a table, the two tables must have the same column ids.
667  //
668  // also build a map of src dict paths and dst dict paths for relocating src dicts
669  std::unordered_map<int, int> column_ids_map;
670  std::unordered_map<std::string, std::string> dict_paths_map;
671  // sort inputs of transform in lexical order of column names for correct mappings
672  std::list<std::vector<std::string>> src_oldinfo_tokens;
674  src_oldinfo_strs.begin(),
675  src_oldinfo_strs.end(),
676  std::back_inserter(src_oldinfo_tokens),
677  [](const auto& src_oldinfo_str) -> auto{
678  std::vector<std::string> tokens;
680  tokens, src_oldinfo_str, boost::is_any_of(":"), boost::token_compress_on);
681  return tokens;
682  });
683  src_oldinfo_tokens.sort(
684  [](const auto& lhs, const auto& rhs) { return lhs[0].compare(rhs[0]) < 0; });
685  all_dst_columns.sort(
686  [](auto a, auto b) { return a->columnName.compare(b->columnName) < 0; });
687  // transform inputs into the maps
688  std::transform(src_oldinfo_tokens.begin(),
689  src_oldinfo_tokens.end(),
690  all_dst_columns.begin(),
691  std::inserter(column_ids_map, column_ids_map.end()),
692  [&](const auto& tokens, const auto& cd) -> std::pair<int, int> {
693  VLOG(3) << boost::algorithm::join(tokens, ":") << " ==> "
694  << cd->columnName << ":" << cd->columnId;
695  dict_paths_map[tokens[2]] = cat_->getColumnDictDirectory(cd);
696  return {boost::lexical_cast<int>(tokens[1]), cd->columnId};
697  });
698  bool was_table_altered = false;
699  std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](auto& it) {
700  was_table_altered = was_table_altered || it.first != it.second;
701  });
702  VLOG(3) << "was_table_altered = " << was_table_altered;
703 
704  // extract all data files to a temp dir. will swap with dst table dir after all set,
705  // otherwise will corrupt table in case any bad thing happens in the middle.
706  run("rm -rf " + temp_data_dir.string());
707  run("mkdir -p " + temp_data_dir.string());
708  run("tar " + compression + " -xvf " + get_quoted_string(archive_path), temp_data_dir);
709 
710  // drop the render group columns here
711  if (do_drop_render_group_columns) {
713  render_group_column_ids, archive_path, temp_data_dir, compression);
714  }
715 
716  // if table was ever altered after it was created, update column ids in chunk headers.
717  if (was_table_altered) {
718  const auto epoch = boost::lexical_cast<int32_t>(
719  simple_file_cat(archive_path, table_epoch_filename, compression));
720  const auto time_ms = measure<>::execution([&]() {
722  epoch, temp_data_dir, column_ids_map, false /* update */);
723  });
724  VLOG(3) << "update_column_ids_table_files: " << time_ms << " ms";
725  }
726 
727  // finally,,, swap table data/dict dirs!
728  const auto data_file_dirs = cat_->getTableDataDirectories(td);
729  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
730  // move current target dirs, if exists, to backup dir
731  std::vector<std::string> both_file_dirs;
732  std::merge(data_file_dirs.begin(),
733  data_file_dirs.end(),
734  dict_file_dirs.begin(),
735  dict_file_dirs.end(),
736  std::back_inserter(both_file_dirs));
737  bool backup_completed = false;
738  try {
739  run("rm -rf " + temp_back_dir.string());
740  run("mkdir -p " + temp_back_dir.string());
741  for (const auto& dir : both_file_dirs) {
742  const auto dir_full_path = abs_path(global_file_mgr) + "/" + dir;
743  if (boost::filesystem::is_directory(dir_full_path)) {
744  run("mv " + dir_full_path + " " + temp_back_dir.string());
745  }
746  }
747  backup_completed = true;
748  // Move table directories from temp dir to main data directory.
749  rename_table_directories(global_file_mgr, temp_data_dir, data_file_dirs, "table_");
750  // Move dictionaries from temp dir to main dir.
751  for (const auto& dit : dict_paths_map) {
752  if (!dit.first.empty() && !dit.second.empty()) {
753  const auto src_dict_path = temp_data_dir.string() + "/" + dit.first;
754  const auto dst_dict_path = abs_path(global_file_mgr) + "/" + dit.second;
755  run("mv " + src_dict_path + " " + dst_dict_path);
756  }
757  }
758  // throw if sanity test forces a rollback
760  throw std::runtime_error("lol!");
761  }
762  } catch (...) {
763  // once backup is completed, whatever in abs_path(global_file_mgr) is the "src"
764  // dirs that are to be rolled back and discarded
765  if (backup_completed) {
766  run("rm -rf " + boost::algorithm::join(both_file_dirs, " "),
767  abs_path(global_file_mgr));
768  }
769  // complete rollback by recovering original "dst" table dirs from backup dir
770  boost::filesystem::path base_path(temp_back_dir);
771  boost::filesystem::directory_iterator end_it;
772  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
773  run("mv " + fit->path().string() + " .", abs_path(global_file_mgr));
774  }
775  throw;
776  }
777  // set for reloading table from the restored/migrated files
778  const auto epoch = simple_file_cat(archive_path, table_epoch_filename, compression);
780  cat_->getCurrentDB().dbId, td->tableId, boost::lexical_cast<int>(epoch));
781 }
782 
783 #ifdef HAVE_AWS_S3
784 namespace {
785 std::string get_restore_dir_path() {
786  auto uuid = boost::uuids::to_string(boost::uuids::random_generator()());
787  auto restore_dir_path = std::filesystem::canonical(g_base_path) /
788  shared::kDefaultImportDirName / ("s3-restore-" + uuid);
789  return restore_dir_path;
790 }
791 
792 std::string download_s3_file(const std::string& s3_archive_path,
793  const TableArchiverS3Options& s3_options,
794  const std::string& restore_dir_path) {
795  S3Archive s3_archive{s3_archive_path,
796  s3_options.s3_access_key,
797  s3_options.s3_secret_key,
798  s3_options.s3_session_token,
799  s3_options.s3_region,
800  s3_options.s3_endpoint,
801  false,
802  std::optional<std::string>{},
803  std::optional<std::string>{},
804  std::optional<std::string>{},
805  restore_dir_path};
806  s3_archive.init_for_read();
807  const auto& object_key = s3_archive.get_objkeys();
808  if (object_key.size() > 1) {
809  throw std::runtime_error(
810  "S3 URI references multiple files. Only one file can be restored at a time.");
811  }
812  CHECK_EQ(object_key.size(), size_t(1));
813  std::exception_ptr eptr;
814  return s3_archive.land(object_key[0], eptr, false, false, false);
815 }
816 } // namespace
817 #endif
818 
819 // Migrate a table, which doesn't exist in current db, from a tar ball to the db.
820 // This actually creates the table and restores data/dict files from the tar ball.
822  const std::string& table_name,
823  const std::string& archive_path,
824  const std::string& compression,
825  const TableArchiverS3Options& s3_options) {
826  auto local_archive_path = archive_path;
827 #ifdef HAVE_AWS_S3
828  const auto restore_dir_path = get_restore_dir_path();
829  ScopeGuard archive_cleanup_guard = [&archive_path, &restore_dir_path] {
830  if (shared::is_s3_uri(archive_path) && std::filesystem::exists(restore_dir_path)) {
831  std::filesystem::remove_all(restore_dir_path);
832  }
833  };
834  if (shared::is_s3_uri(archive_path)) {
835  local_archive_path = download_s3_file(archive_path, s3_options, restore_dir_path);
836  }
837 #endif
838 
839  // replace table name and drop foreign dict references
840  const auto schema_str = get_table_schema(local_archive_path, table_name, compression);
841  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
842  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
843  CHECK(create_table_stmt);
844  create_table_stmt->execute(session, false /*read-only*/);
845 
846  try {
847  restoreTable(
848  session, cat_->getMetadataForTable(table_name), local_archive_path, compression);
849  } catch (...) {
850  const auto schema_str = "DROP TABLE IF EXISTS " + table_name + ";";
851  std::unique_ptr<Parser::Stmt> stmt =
852  Parser::create_stmt_for_query(schema_str, session);
853  const auto drop_table_stmt = dynamic_cast<Parser::DropTableStmt*>(stmt.get());
854  CHECK(drop_table_stmt);
855  drop_table_stmt->execute(session, false /*read-only*/);
856 
857  throw;
858  }
859 }
std::string s3_secret_key
Definition: TableArchiver.h:26
std::string to_lower(const std::string &str)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
void delete_old_symlinks(const std::string &table_data_dir)
std::string getBasePath() const
std::string getColumnDictDirectory(const ColumnDescriptor *cd, bool file_name_only=true) const
Definition: Catalog.cpp:5388
static constexpr char const * table_schema_filename
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string tableName
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
bool is_s3_uri(const std::string &file_path)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
std::unordered_map< int, int > find_render_group_columns(const std::list< ColumnDescriptor > &src_columns, std::vector< std::string > &src_oldinfo_strs, const std::string &archive_path)
#define LOG(tag)
Definition: Logger.h:285
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
void startThread(FuncType &&func, Args &&...args)
std::string join(T const &container, std::string const &delim)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * table_oldinfo_filename
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3579
void drop_render_group_columns(const std::unordered_map< int, int > &render_group_column_ids, const std::string &archive_path, const std::string &temp_data_dir, const std::string &compression)
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5374
bool isForeignTable() const
std::string to_string(char const *&&v)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
constexpr double a
Definition: Utm.h:32
const std::string kDefaultImportDirName
Classes representing a parse tree.
std::string get_quoted_string(const std::string &filename, char quote, char escape)
Quote a string while escaping any existing quotes in the string.
static constexpr int kDumpVersion_remove_render_group_columns
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
std::string g_base_path
Definition: SysCatalog.cpp:62
void rename_table_directories(const File_Namespace::GlobalFileMgr *global_file_mgr, const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix)
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:320
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:649
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:822
void update_or_drop_column_ids_in_page_headers(const boost::filesystem::path &path, const std::unordered_map< int, int > &column_ids_map, const int32_t table_epoch, const bool drop_not_update)
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression, const bool log_failure=true)
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:3647
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5409
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:238
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:259
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2172
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: TableArchiver.h:27
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
static constexpr char const * table_epoch_filename
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1136
def error_code
Definition: report.py:244
#define CHECK(condition)
Definition: Logger.h:291
bool g_cluster
static constexpr int kDumpVersion
void add_data_file_symlinks(const std::string &table_data_dir)
void update_or_drop_column_ids_in_table_files(const int32_t table_epoch, const std::string &temp_data_dir, const std::unordered_map< int, int > &column_ids_map, const bool drop_not_update)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::string s3_access_key
Definition: TableArchiver.h:25
static bool run
static constexpr char const * table_dumpversion_filename
std::string dumpSchema(const TableDescriptor *td) const
Definition: Catalog.cpp:5440
void restoreTable(const Catalog_Namespace::SessionInfo &session, const std::string &table_name, const std::string &archive_path, const std::string &compression, const TableArchiverS3Options &s3_options)
int cpu_threads()
Definition: thread_count.h:25
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void dumpTable(const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
A selection of helper methods for File I/O.
#define VLOG(n)
Definition: Logger.h:388
bool g_test_rollback_dump_restore
Catalog_Namespace::Catalog * cat_
Definition: TableArchiver.h:52