OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableArchiver.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/filesystem.hpp>
21 #include <boost/process.hpp>
22 #include <boost/range/combine.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 #include <boost/version.hpp>
26 #include <cerrno>
27 #include <cstdio>
28 #include <cstring>
29 #include <exception>
30 #include <list>
31 #include <memory>
32 #include <regex>
33 #include <set>
34 #include <sstream>
35 #include <system_error>
36 
37 #include "Archive/S3Archive.h"
41 #include "LockMgr/LockMgr.h"
42 #include "Logger/Logger.h"
43 #include "Parser/ParserNode.h"
44 #include "Shared/File.h"
45 #include "Shared/StringTransform.h"
46 #include "Shared/SysDefinitions.h"
48 #include "Shared/file_path_util.h"
49 #include "Shared/measure.h"
50 #include "Shared/scope.h"
51 #include "Shared/thread_count.h"
52 
53 extern bool g_cluster;
54 extern std::string g_base_path;
56 
57 constexpr static char const* table_schema_filename = "_table.sql";
58 constexpr static char const* table_oldinfo_filename = "_table.oldinfo";
59 constexpr static char const* table_epoch_filename = "_table.epoch";
60 
61 #if BOOST_VERSION < 107300
62 namespace std {
63 
64 template <typename T, typename U>
65 struct tuple_size<boost::tuples::cons<T, U>>
66  : boost::tuples::length<boost::tuples::cons<T, U>> {};
67 template <size_t I, typename T, typename U>
68 struct tuple_element<I, boost::tuples::cons<T, U>>
69  : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
70 
71 } // namespace std
72 #endif
73 
74 namespace {
75 
76 inline auto simple_file_closer = [](FILE* f) { std::fclose(f); };
77 
78 inline std::string abs_path(const File_Namespace::GlobalFileMgr* global_file_mgr) {
79  return boost::filesystem::canonical(global_file_mgr->getBasePath()).string();
80 }
81 
82 inline std::string run(const std::string& cmd, const std::string& chdir = "") {
83  VLOG(3) << "running cmd: " << cmd;
84  int rcode;
85  std::error_code ec;
86  std::string output, errors;
87  const auto time_ms = measure<>::execution([&]() {
88  using namespace boost::process;
89  ipstream stdout, stderr;
90  if (!chdir.empty()) {
91  rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
92  } else {
93  rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
94  }
95  std::ostringstream ss_output, ss_errors;
96  stdout >> ss_output.rdbuf();
97  stderr >> ss_errors.rdbuf();
98  output = ss_output.str();
99  errors = ss_errors.str();
100  });
101  if (rcode || ec) {
102  LOG(ERROR) << "failed cmd: " << cmd;
103  LOG(ERROR) << "exit code: " << rcode;
104  LOG(ERROR) << "error code: " << ec.value() << " - " << ec.message();
105  LOG(ERROR) << "stdout: " << output;
106  LOG(ERROR) << "stderr: " << errors;
107 #if defined(__APPLE__)
108  // osx bsdtar options "--use-compress-program" and "--fast-read" together
109  // run into pipe write error after tar extracts the first occurrence of a
110  // file and closes the read end while the decompression program still writes
111  // to the pipe. bsdtar doesn't handle this situation well like gnu tar does.
112  if (1 == rcode && cmd.find("--fast-read") &&
113  (errors.find("cannot write decoded block") != std::string::npos ||
114  errors.find("Broken pipe") != std::string::npos)) {
115  // ignore this error, or lose speed advantage of "--fast-read" on osx.
116  LOG(ERROR) << "tar error ignored on osx for --fast-read";
117  } else
118 #endif
119  // circumvent tar warning on reading file that is "changed as we read it".
120  // this warning results from reading a table file under concurrent inserts
121  if (1 == rcode && errors.find("changed as we read") != std::string::npos) {
122  LOG(ERROR) << "tar error ignored under concurrent inserts";
123  } else {
124  int error_code;
125  std::string error_message;
126  if (ec) {
127  error_code = ec.value();
128  error_message = ec.message();
129  } else {
130  error_code = rcode;
131  // Show a more concise message for permission errors instead of the default
132  // verbose message. Error logs will still contain all details.
133  if (to_lower(errors).find("permission denied") != std::string::npos) {
134  error_message = "Insufficient file read/write permission.";
135  } else {
136  error_message = errors;
137  }
138  }
139  throw std::runtime_error(
140  "An error occurred while executing an internal command. Error code: " +
141  std::to_string(error_code) + ", message: " + error_message);
142  }
143  } else {
144  VLOG(3) << "finished cmd: " << cmd;
145  VLOG(3) << "time: " << time_ms << " ms";
146  VLOG(3) << "stdout: " << output;
147  }
148  return output;
149 }
150 
151 inline std::string simple_file_cat(const std::string& archive_path,
152  const std::string& file_name,
153  const std::string& compression) {
156 #if defined(__APPLE__)
157  constexpr static auto opt_occurrence = "--fast-read";
158 #else
159  constexpr static auto opt_occurrence = "--occurrence=1";
160 #endif
161  boost::filesystem::path temp_dir =
162  boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
163  boost::filesystem::create_directories(temp_dir);
164  run("tar " + compression + " -xvf " + get_quoted_string(archive_path) + " " +
165  opt_occurrence + " " + file_name,
166  temp_dir.string());
167  const auto output = run("cat " + (temp_dir / file_name).string());
168  boost::filesystem::remove_all(temp_dir);
169  return output;
170 }
171 
172 inline std::string get_table_schema(const std::string& archive_path,
173  const std::string& table,
174  const std::string& compression) {
175  const auto schema_str =
176  simple_file_cat(archive_path, table_schema_filename, compression);
177  std::regex regex("@T");
178  return std::regex_replace(schema_str, regex, table);
179 }
180 
181 // If a table was altered there may be a mapping from old column ids to new ones these
182 // values need to be replaced in the page headers.
184  const boost::filesystem::path& path,
185  const std::unordered_map<int, int>& column_ids_map,
186  const int32_t table_epoch) {
187  const std::string file_path = path.string();
188  const std::string file_name = path.filename().string();
189  std::vector<std::string> tokens;
190  boost::split(tokens, file_name, boost::is_any_of("."));
191 
192  // ref. FileMgr::init for hint of data file name layout
193  if (tokens.size() <= 2 || !(DATA_FILE_EXT == "." + tokens[2] || tokens[2] == "mapd")) {
194  // We are only interested in files in the form <id>.<page_size>.<DATA_FILE_EXT>
195  return;
196  }
197 
198  const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
199  const auto file_size = boost::filesystem::file_size(file_path);
200  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
201  std::fopen(file_path.c_str(), "r+"), simple_file_closer);
202  if (!fp) {
203  throw std::runtime_error("Failed to open " + file_path +
204  " for update: " + std::strerror(errno));
205  }
206  // TODO(Misiu): Rather than reference an exernal layout we should de-duplicate this
207  // page-reading code in a single location. This will also reduce the need for comments
208  // below.
209  // ref. FileInfo::openExistingFile for hint of chunk header layout
210  for (size_t page = 0; page < file_size / page_size; ++page) {
211  int32_t header_info[8];
212  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
213  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
214  file_path + " for read: " + std::strerror(errno));
215  }
216  if (1 != fread(header_info, sizeof header_info, 1, fp.get())) {
217  throw std::runtime_error("Failed to read " + file_path + ": " +
218  std::strerror(errno));
219  }
220  if (const auto header_size = header_info[0]; header_size > 0) {
221  // header_info[1] is the page's db_id; but can also be used as an "is deleted"
222  // indicator if negative.
223  auto& contingent = header_info[1];
224  // header_info[2] is the page's table_id; but can also used to store the page's
225  // epoch since the FileMgr stores table_id information separately.
226  auto& epoch = header_info[2];
227  auto& col_id = header_info[3];
229  table_epoch, epoch, contingent)) {
230  continue;
231  }
232  auto column_map_it = column_ids_map.find(col_id);
233  CHECK(column_map_it != column_ids_map.end()) << "could not find " << col_id;
234  // If a header contains a column id that is remapped to new location
235  // then write that change to the file.
236  if (const auto dest_col_id = column_map_it->second; col_id != dest_col_id) {
237  col_id = dest_col_id;
238  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
239  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
240  file_path + " for write: " + std::strerror(errno));
241  }
242  if (1 != fwrite(header_info, sizeof header_info, 1, fp.get())) {
243  throw std::runtime_error("Failed to write " + file_path + ": " +
244  std::strerror(errno));
245  }
246  }
247  }
248  }
249 }
250 
251 // Adjust column ids in chunk keys in a table's data files under a temp_data_dir,
252 // including files of all shards of the table. Can be slow for big files but should
253 // be scale faster than refragmentizing. Table altering should be rare for olap.
254 void adjust_altered_table_files(const int32_t table_epoch,
255  const std::string& temp_data_dir,
256  const std::unordered_map<int, int>& column_ids_map) {
257  boost::filesystem::path base_path(temp_data_dir);
258  boost::filesystem::recursive_directory_iterator end_it;
260  for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
261  ++fit) {
262  if (!boost::filesystem::is_symlink(fit->path()) &&
263  boost::filesystem::is_regular_file(fit->status())) {
264  thread_controller.startThread(
265  rewrite_column_ids_in_page_headers, fit->path(), column_ids_map, table_epoch);
266  thread_controller.checkThreadsStatus();
267  }
268  }
269  thread_controller.finish();
270 }
271 
272 void delete_old_symlinks(const std::string& table_data_dir) {
273  std::vector<boost::filesystem::path> symlinks;
274  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
275  it++) {
276  if (boost::filesystem::is_symlink(it->path())) {
277  symlinks.emplace_back(it->path());
278  }
279  }
280  for (const auto& symlink : symlinks) {
281  boost::filesystem::remove_all(symlink);
282  }
283 }
284 
285 void add_data_file_symlinks(const std::string& table_data_dir) {
286  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
287  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
288  it++) {
289  const auto path = boost::filesystem::canonical(it->path());
290  if (path.extension().string() == DATA_FILE_EXT) {
291  auto old_path = path;
292  old_path.replace_extension(File_Namespace::kLegacyDataFileExtension);
293  // Add a symlink to data file, if one does not exist.
294  if (!boost::filesystem::exists(old_path)) {
295  old_to_new_paths[old_path] = path;
296  }
297  }
298  }
299  for (const auto& [old_path, new_path] : old_to_new_paths) {
300  boost::filesystem::create_symlink(new_path.filename(), old_path);
301  }
302 }
303 
305  const std::string& temp_data_dir,
306  const std::vector<std::string>& target_paths,
307  const std::string& name_prefix) {
308  boost::filesystem::path base_path(temp_data_dir);
309  boost::filesystem::directory_iterator end_it;
310  int target_path_index = 0;
311  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
312  if (!boost::filesystem::is_regular_file(fit->status())) {
313  const std::string file_path = fit->path().string();
314  const std::string file_name = fit->path().filename().string();
315  if (boost::istarts_with(file_name, name_prefix)) {
316  const std::string target_path =
317  abs_path(global_file_mgr) + "/" + target_paths[target_path_index++];
318  if (std::rename(file_path.c_str(), target_path.c_str())) {
319  throw std::runtime_error("Failed to rename file " + file_path + " to " +
320  target_path + ": " + std::strerror(errno));
321  }
322  // Delete any old/invalid symlinks contained in table dump.
323  delete_old_symlinks(target_path);
325  // For post-rebrand table dumps, symlinks need to be added here, since file mgr
326  // migration would already have been executed for the dumped table.
327  add_data_file_symlinks(target_path);
328  }
329  }
330  }
331 }
332 
333 } // namespace
334 
336  const std::string& archive_path,
337  const std::string& compression) {
338  if (td->is_system_table) {
339  throw std::runtime_error("Dumping a system table is not supported.");
340  }
343  if (g_cluster) {
344  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
345  }
346  if (boost::filesystem::exists(archive_path)) {
347  throw std::runtime_error("Archive " + archive_path + " already exists.");
348  }
350  throw std::runtime_error("Dumping view or temporary table is not supported.");
351  }
352  // create a unique uuid for this table dump
353  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
354 
355  // collect paths of files to archive
356  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
357  std::vector<std::string> file_paths;
358  auto file_writer = [&file_paths, uuid](const std::string& file_name,
359  const std::string& file_type,
360  const std::string& file_data) {
361  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
362  std::fopen(file_name.c_str(), "w"), simple_file_closer);
363  if (!fp) {
364  throw std::runtime_error("Failed to create " + file_type + " file '" + file_name +
365  "': " + std::strerror(errno));
366  }
367  if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
368  throw std::runtime_error("Failed to write " + file_type + " file '" + file_name +
369  "': " + std::strerror(errno));
370  }
371  file_paths.push_back(uuid / std::filesystem::path(file_name).filename());
372  };
373 
374  const auto file_mgr_dir = std::filesystem::path(abs_path(global_file_mgr));
375  const auto uuid_dir = file_mgr_dir / uuid;
376 
377  if (!std::filesystem::create_directory(uuid_dir)) {
378  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
379  "' while dumping table.");
380  }
381 
382  ScopeGuard cleanup_guard = [&] {
383  if (std::filesystem::exists(uuid_dir)) {
384  std::filesystem::remove_all(uuid_dir);
385  }
386  };
387 
388  const auto table_name = td->tableName;
389  {
390  // - gen schema file
391  const auto schema_str = cat_->dumpSchema(td);
392  file_writer(uuid_dir / table_schema_filename, "table schema", schema_str);
393  // - gen column-old-info file
394  const auto cds = cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
395  std::vector<std::string> column_oldinfo;
396  std::transform(cds.begin(),
397  cds.end(),
398  std::back_inserter(column_oldinfo),
399  [&](const auto cd) -> std::string {
400  return cd->columnName + ":" + std::to_string(cd->columnId) + ":" +
402  });
403  const auto column_oldinfo_str = boost::algorithm::join(column_oldinfo, " ");
404  file_writer(uuid_dir / table_oldinfo_filename, "table old info", column_oldinfo_str);
405  // - gen table epoch
406  const auto epoch = cat_->getTableEpoch(cat_->getCurrentDB().dbId, td->tableId);
407  file_writer(uuid_dir / table_epoch_filename, "table epoch", std::to_string(epoch));
408  // - collect table data file paths ...
409  const auto data_file_dirs = cat_->getTableDataDirectories(td);
410  file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
411  // - collect table dict file paths ...
412  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
413  file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
414  // tar takes time. release cat lock to yield the cat to concurrent CREATE statements.
415  }
416  // run tar to archive the files ... this may take a while !!
417  run("tar " + compression + " --transform=s|" + uuid +
418  std::filesystem::path::preferred_separator + "|| -cvf " +
419  get_quoted_string(archive_path) + " " + boost::algorithm::join(file_paths, " "),
420  file_mgr_dir);
421 }
422 
423 // Restore data and dict files of a table from a tgz archive.
425  const TableDescriptor* td,
426  const std::string& archive_path,
427  const std::string& compression) {
430  if (g_cluster) {
431  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
432  }
433  if (!boost::filesystem::exists(archive_path)) {
434  throw std::runtime_error("Archive " + archive_path + " does not exist.");
435  }
437  throw std::runtime_error("Restoring view or temporary table is not supported.");
438  }
439  // Obtain table schema read lock to prevent modification of the schema during
440  // restoration
441  const auto table_read_lock =
443  // prevent concurrent inserts into table during restoration
444  const auto insert_data_lock =
446 
447  // untar takes time. no grab of cat lock to yield to concurrent CREATE stmts.
448  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
449 
450  // create a unique uuid for this table restore
451  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
452 
453  const auto uuid_dir = std::filesystem::path(abs_path(global_file_mgr)) / uuid;
454 
455  if (!std::filesystem::create_directory(uuid_dir)) {
456  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
457  "' while restoring table.");
458  }
459 
460  ScopeGuard cleanup_guard = [&] {
461  if (std::filesystem::exists(uuid_dir)) {
462  std::filesystem::remove_all(uuid_dir);
463  }
464  };
465 
466  // dirs where src files are untarred and dst files are backed up
467  constexpr static const auto temp_data_basename = "_data";
468  constexpr static const auto temp_back_basename = "_back";
469  const auto temp_data_dir = uuid_dir / temp_data_basename;
470  const auto temp_back_dir = uuid_dir / temp_back_basename;
471 
472  // extract & parse schema
473  const auto schema_str = get_table_schema(archive_path, td->tableName, compression);
474  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
475  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
476  CHECK(create_table_stmt);
477 
478  // verify compatibility between source and destination schemas
479  TableDescriptor src_td;
480  std::list<ColumnDescriptor> src_columns;
481  std::vector<Parser::SharedDictionaryDef> shared_dict_defs;
482  create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
483  // - sanity check table-level compatibility
484  if (src_td.hasDeletedCol != td->hasDeletedCol) {
485  // TODO: allow the case, in which src data enables vacuum while
486  // dst doesn't, by simply discarding src $deleted column data.
487  throw std::runtime_error("Incompatible table VACCUM option");
488  }
489  if (src_td.nShards != td->nShards) {
490  // TODO: allow different shard numbers if they have a "GCD",
491  // by splitting/merging src data files before drop into dst.
492  throw std::runtime_error("Unmatched number of table shards");
493  }
494  // - sanity check column-level compatibility (based on column names)
495  const auto dst_columns =
496  cat_->getAllColumnMetadataForTable(td->tableId, false, false, false);
497  if (dst_columns.size() != src_columns.size()) {
498  throw std::runtime_error("Unmatched number of table columns");
499  }
500  for (const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
501  if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
502  src_cd.columnType.get_compression_name() !=
503  dst_cd->columnType.get_compression_name()) {
504  throw std::runtime_error("Incompatible types on column " + src_cd.columnName);
505  }
506  }
507  // extract src table column ids (ALL columns incl. system/virtual/phy geo cols)
508  const auto all_src_oldinfo_str =
509  simple_file_cat(archive_path, table_oldinfo_filename, compression);
510  std::vector<std::string> src_oldinfo_strs;
511  boost::algorithm::split(src_oldinfo_strs,
512  all_src_oldinfo_str,
513  boost::is_any_of(" "),
514  boost::token_compress_on);
515  auto all_dst_columns =
516  cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
517  if (src_oldinfo_strs.size() != all_dst_columns.size()) {
518  throw std::runtime_error("Source table has a unmatched number of columns: " +
519  std::to_string(src_oldinfo_strs.size()) + " vs " +
520  std::to_string(all_dst_columns.size()));
521  }
522  // build a map of src column ids and dst column ids, just in case src table has been
523  // ALTERed before and chunk keys of src table needs to be adjusted accordingly.
524  // note: this map is used only for the case of migrating a table and not for restoring
525  // a table. When restoring a table, the two tables must have the same column ids.
526  //
527  // also build a map of src dict paths and dst dict paths for relocating src dicts
528  std::unordered_map<int, int> column_ids_map;
529  std::unordered_map<std::string, std::string> dict_paths_map;
530  // sort inputs of transform in lexical order of column names for correct mappings
531  std::list<std::vector<std::string>> src_oldinfo_tokens;
533  src_oldinfo_strs.begin(),
534  src_oldinfo_strs.end(),
535  std::back_inserter(src_oldinfo_tokens),
536  [](const auto& src_oldinfo_str) -> auto {
537  std::vector<std::string> tokens;
539  tokens, src_oldinfo_str, boost::is_any_of(":"), boost::token_compress_on);
540  return tokens;
541  });
542  src_oldinfo_tokens.sort(
543  [](const auto& lhs, const auto& rhs) { return lhs[0].compare(rhs[0]) < 0; });
544  all_dst_columns.sort(
545  [](auto a, auto b) { return a->columnName.compare(b->columnName) < 0; });
546  // transform inputs into the maps
547  std::transform(src_oldinfo_tokens.begin(),
548  src_oldinfo_tokens.end(),
549  all_dst_columns.begin(),
550  std::inserter(column_ids_map, column_ids_map.end()),
551  [&](const auto& tokens, const auto& cd) -> std::pair<int, int> {
552  VLOG(3) << boost::algorithm::join(tokens, ":") << " ==> "
553  << cd->columnName << ":" << cd->columnId;
554  dict_paths_map[tokens[2]] = cat_->getColumnDictDirectory(cd);
555  return {boost::lexical_cast<int>(tokens[1]), cd->columnId};
556  });
557  bool was_table_altered = false;
558  std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](auto& it) {
559  was_table_altered = was_table_altered || it.first != it.second;
560  });
561  VLOG(3) << "was_table_altered = " << was_table_altered;
562  // extract all data files to a temp dir. will swap with dst table dir after all set,
563  // otherwise will corrupt table in case any bad thing happens in the middle.
564  run("rm -rf " + temp_data_dir.string());
565  run("mkdir -p " + temp_data_dir.string());
566  run("tar " + compression + " -xvf " + get_quoted_string(archive_path), temp_data_dir);
567 
568  // if table was ever altered after it was created, update column ids in chunk headers.
569  if (was_table_altered) {
570  const auto epoch = boost::lexical_cast<int32_t>(
571  simple_file_cat(archive_path, table_epoch_filename, compression));
572  const auto time_ms = measure<>::execution(
573  [&]() { adjust_altered_table_files(epoch, temp_data_dir, column_ids_map); });
574  VLOG(3) << "adjust_altered_table_files: " << time_ms << " ms";
575  }
576  // finally,,, swap table data/dict dirs!
577  const auto data_file_dirs = cat_->getTableDataDirectories(td);
578  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
579  // move current target dirs, if exists, to backup dir
580  std::vector<std::string> both_file_dirs;
581  std::merge(data_file_dirs.begin(),
582  data_file_dirs.end(),
583  dict_file_dirs.begin(),
584  dict_file_dirs.end(),
585  std::back_inserter(both_file_dirs));
586  bool backup_completed = false;
587  try {
588  run("rm -rf " + temp_back_dir.string());
589  run("mkdir -p " + temp_back_dir.string());
590  for (const auto& dir : both_file_dirs) {
591  const auto dir_full_path = abs_path(global_file_mgr) + "/" + dir;
592  if (boost::filesystem::is_directory(dir_full_path)) {
593  run("mv " + dir_full_path + " " + temp_back_dir.string());
594  }
595  }
596  backup_completed = true;
597  // Move table directories from temp dir to main data directory.
598  rename_table_directories(global_file_mgr, temp_data_dir, data_file_dirs, "table_");
599  // Move dictionaries from temp dir to main dir.
600  for (const auto& dit : dict_paths_map) {
601  if (!dit.first.empty() && !dit.second.empty()) {
602  const auto src_dict_path = temp_data_dir.string() + "/" + dit.first;
603  const auto dst_dict_path = abs_path(global_file_mgr) + "/" + dit.second;
604  run("mv " + src_dict_path + " " + dst_dict_path);
605  }
606  }
607  // throw if sanity test forces a rollback
609  throw std::runtime_error("lol!");
610  }
611  } catch (...) {
612  // once backup is completed, whatever in abs_path(global_file_mgr) is the "src"
613  // dirs that are to be rolled back and discarded
614  if (backup_completed) {
615  run("rm -rf " + boost::algorithm::join(both_file_dirs, " "),
616  abs_path(global_file_mgr));
617  }
618  // complete rollback by recovering original "dst" table dirs from backup dir
619  boost::filesystem::path base_path(temp_back_dir);
620  boost::filesystem::directory_iterator end_it;
621  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
622  run("mv " + fit->path().string() + " .", abs_path(global_file_mgr));
623  }
624  throw;
625  }
626  // set for reloading table from the restored/migrated files
627  const auto epoch = simple_file_cat(archive_path, table_epoch_filename, compression);
629  cat_->getCurrentDB().dbId, td->tableId, boost::lexical_cast<int>(epoch));
630 }
631 
632 #ifdef HAVE_AWS_S3
633 namespace {
634 std::string get_restore_dir_path() {
635  auto uuid = boost::uuids::to_string(boost::uuids::random_generator()());
636  auto restore_dir_path = std::filesystem::canonical(g_base_path) /
637  shared::kDefaultImportDirName / ("s3-restore-" + uuid);
638  return restore_dir_path;
639 }
640 
641 std::string download_s3_file(const std::string& s3_archive_path,
642  const TableArchiverS3Options& s3_options,
643  const std::string& restore_dir_path) {
644  S3Archive s3_archive{s3_archive_path,
645  s3_options.s3_access_key,
646  s3_options.s3_secret_key,
647  s3_options.s3_session_token,
648  s3_options.s3_region,
649  s3_options.s3_endpoint,
650  false,
651  std::optional<std::string>{},
652  std::optional<std::string>{},
653  std::optional<std::string>{},
654  restore_dir_path};
655  s3_archive.init_for_read();
656  const auto& object_key = s3_archive.get_objkeys();
657  if (object_key.size() > 1) {
658  throw std::runtime_error(
659  "S3 URI references multiple files. Only one file can be restored at a time.");
660  }
661  CHECK_EQ(object_key.size(), size_t(1));
662  std::exception_ptr eptr;
663  return s3_archive.land(object_key[0], eptr, false, false, false);
664 }
665 } // namespace
666 #endif
667 
668 // Migrate a table, which doesn't exist in current db, from a tar ball to the db.
669 // This actually creates the table and restores data/dict files from the tar ball.
671  const std::string& table_name,
672  const std::string& archive_path,
673  const std::string& compression,
674  const TableArchiverS3Options& s3_options) {
675  auto local_archive_path = archive_path;
676 #ifdef HAVE_AWS_S3
677  const auto restore_dir_path = get_restore_dir_path();
678  ScopeGuard archive_cleanup_guard = [&archive_path, &restore_dir_path] {
679  if (shared::is_s3_uri(archive_path) && std::filesystem::exists(restore_dir_path)) {
680  std::filesystem::remove_all(restore_dir_path);
681  }
682  };
683  if (shared::is_s3_uri(archive_path)) {
684  local_archive_path = download_s3_file(archive_path, s3_options, restore_dir_path);
685  }
686 #endif
687 
688  // replace table name and drop foreign dict references
689  const auto schema_str = get_table_schema(local_archive_path, table_name, compression);
690  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
691  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
692  CHECK(create_table_stmt);
693  create_table_stmt->execute(session, false /*read-only*/);
694 
695  try {
696  restoreTable(
697  session, cat_->getMetadataForTable(table_name), local_archive_path, compression);
698  } catch (...) {
699  const auto schema_str = "DROP TABLE IF EXISTS " + table_name + ";";
700  std::unique_ptr<Parser::Stmt> stmt =
701  Parser::create_stmt_for_query(schema_str, session);
702  const auto drop_table_stmt = dynamic_cast<Parser::DropTableStmt*>(stmt.get());
703  CHECK(drop_table_stmt);
704  drop_table_stmt->execute(session, false /*read-only*/);
705 
706  throw;
707  }
708 }
std::string s3_secret_key
Definition: TableArchiver.h:26
std::string to_lower(const std::string &str)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
void delete_old_symlinks(const std::string &table_data_dir)
std::string getBasePath() const
std::string getColumnDictDirectory(const ColumnDescriptor *cd, bool file_name_only=true) const
Definition: Catalog.cpp:5051
static constexpr char const * table_schema_filename
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string tableName
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void rewrite_column_ids_in_page_headers(const boost::filesystem::path &path, const std::unordered_map< int, int > &column_ids_map, const int32_t table_epoch)
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
bool is_s3_uri(const std::string &file_path)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:249
#define LOG(tag)
Definition: Logger.h:285
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
void startThread(FuncType &&func, Args &&...args)
std::string join(T const &container, std::string const &delim)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * table_oldinfo_filename
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3387
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5037
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
constexpr double a
Definition: Utm.h:32
const std::string kDefaultImportDirName
Classes representing a parse tree.
std::string get_quoted_string(const std::string &filename, char quote, char escape)
Quote a string while escaping any existing quotes in the string.
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:248
std::string g_base_path
Definition: SysCatalog.cpp:62
void rename_table_directories(const File_Namespace::GlobalFileMgr *global_file_mgr, const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix)
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:320
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:606
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:785
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:3455
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5072
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:238
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:277
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2267
Data_Namespace::MemoryLevel persistenceLevel
void adjust_altered_table_files(const int32_t table_epoch, const std::string &temp_data_dir, const std::unordered_map< int, int > &column_ids_map)
std::string s3_session_token
Definition: TableArchiver.h:27
static constexpr char const * table_epoch_filename
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1125
#define CHECK(condition)
Definition: Logger.h:291
bool g_cluster
void add_data_file_symlinks(const std::string &table_data_dir)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::string s3_access_key
Definition: TableArchiver.h:25
static bool run
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression)
std::string dumpSchema(const TableDescriptor *td) const
Definition: Catalog.cpp:5103
void restoreTable(const Catalog_Namespace::SessionInfo &session, const std::string &table_name, const std::string &archive_path, const std::string &compression, const TableArchiverS3Options &s3_options)
int cpu_threads()
Definition: thread_count.h:25
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void dumpTable(const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
A selection of helper methods for File I/O.
#define VLOG(n)
Definition: Logger.h:387
bool g_test_rollback_dump_restore
Catalog_Namespace::Catalog * cat_
Definition: TableArchiver.h:52