OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableArchiver.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/filesystem.hpp>
21 #include <boost/process.hpp>
22 #include <boost/range/combine.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 #include <boost/version.hpp>
26 #include <cerrno>
27 #include <cstdio>
28 #include <cstring>
29 #include <exception>
30 #include <list>
31 #include <memory>
32 #include <regex>
33 #include <set>
34 #include <sstream>
35 #include <system_error>
36 
40 #include "LockMgr/LockMgr.h"
41 #include "Logger/Logger.h"
42 #include "Parser/ParserNode.h"
43 #include "Shared/File.h"
44 #include "Shared/StringTransform.h"
46 #include "Shared/measure.h"
47 #include "Shared/scope.h"
48 #include "Shared/thread_count.h"
49 
50 extern bool g_cluster;
52 
53 constexpr static char const* table_schema_filename = "_table.sql";
54 constexpr static char const* table_oldinfo_filename = "_table.oldinfo";
55 constexpr static char const* table_epoch_filename = "_table.epoch";
56 
57 #if BOOST_VERSION < 107300
58 namespace std {
59 
60 template <typename T, typename U>
61 struct tuple_size<boost::tuples::cons<T, U>>
62  : boost::tuples::length<boost::tuples::cons<T, U>> {};
63 template <size_t I, typename T, typename U>
64 struct tuple_element<I, boost::tuples::cons<T, U>>
65  : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
66 
67 } // namespace std
68 #endif
69 
70 namespace {
71 
72 inline auto simple_file_closer = [](FILE* f) { std::fclose(f); };
73 
74 inline std::string abs_path(const File_Namespace::GlobalFileMgr* global_file_mgr) {
75  return boost::filesystem::canonical(global_file_mgr->getBasePath()).string();
76 }
77 
78 inline std::string run(const std::string& cmd, const std::string& chdir = "") {
79  VLOG(3) << "running cmd: " << cmd;
80  int rcode;
81  std::error_code ec;
82  std::string output, errors;
83  const auto time_ms = measure<>::execution([&]() {
84  using namespace boost::process;
85  ipstream stdout, stderr;
86  if (!chdir.empty()) {
87  rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
88  } else {
89  rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
90  }
91  std::ostringstream ss_output, ss_errors;
92  stdout >> ss_output.rdbuf();
93  stderr >> ss_errors.rdbuf();
94  output = ss_output.str();
95  errors = ss_errors.str();
96  });
97  if (rcode || ec) {
98  LOG(ERROR) << "failed cmd: " << cmd;
99  LOG(ERROR) << "exit code: " << rcode;
100  LOG(ERROR) << "error code: " << ec.value() << " - " << ec.message();
101  LOG(ERROR) << "stdout: " << output;
102  LOG(ERROR) << "stderr: " << errors;
103 #if defined(__APPLE__)
104  // osx bsdtar options "--use-compress-program" and "--fast-read" together
105  // run into pipe write error after tar extracts the first occurrence of a
106  // file and closes the read end while the decompression program still writes
107  // to the pipe. bsdtar doesn't handle this situation well like gnu tar does.
108  if (1 == rcode && cmd.find("--fast-read") &&
109  (errors.find("cannot write decoded block") != std::string::npos ||
110  errors.find("Broken pipe") != std::string::npos)) {
111  // ignore this error, or lose speed advantage of "--fast-read" on osx.
112  LOG(ERROR) << "tar error ignored on osx for --fast-read";
113  } else
114 #endif
115  // circumvent tar warning on reading file that is "changed as we read it".
116  // this warning results from reading a table file under concurrent inserts
117  if (1 == rcode && errors.find("changed as we read") != std::string::npos) {
118  LOG(ERROR) << "tar error ignored under concurrent inserts";
119  } else {
120  int error_code;
121  std::string error_message;
122  if (ec) {
123  error_code = ec.value();
124  error_message = ec.message();
125  } else {
126  error_code = rcode;
127  // Show a more concise message for permission errors instead of the default
128  // verbose message. Error logs will still contain all details.
129  if (to_lower(errors).find("permission denied") != std::string::npos) {
130  error_message = "Insufficient file read/write permission.";
131  } else {
132  error_message = errors;
133  }
134  }
135  throw std::runtime_error(
136  "An error occurred while executing an internal command. Error code: " +
137  std::to_string(error_code) + ", message: " + error_message);
138  }
139  } else {
140  VLOG(3) << "finished cmd: " << cmd;
141  VLOG(3) << "time: " << time_ms << " ms";
142  VLOG(3) << "stdout: " << output;
143  }
144  return output;
145 }
146 
147 inline std::string simple_file_cat(const std::string& archive_path,
148  const std::string& file_name,
149  const std::string& compression) {
152 #if defined(__APPLE__)
153  constexpr static auto opt_occurrence = "--fast-read";
154 #else
155  constexpr static auto opt_occurrence = "--occurrence=1";
156 #endif
157  boost::filesystem::path temp_dir =
158  boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
159  boost::filesystem::create_directories(temp_dir);
160  run("tar " + compression + " -xvf " + get_quoted_string(archive_path) + " " +
161  opt_occurrence + " " + file_name,
162  temp_dir.string());
163  const auto output = run("cat " + (temp_dir / file_name).string());
164  boost::filesystem::remove_all(temp_dir);
165  return output;
166 }
167 
168 inline std::string get_table_schema(const std::string& archive_path,
169  const std::string& table,
170  const std::string& compression) {
171  const auto schema_str =
172  simple_file_cat(archive_path, table_schema_filename, compression);
173  std::regex regex("@T");
174  return std::regex_replace(schema_str, regex, table);
175 }
176 
177 // If a table was altered there may be a mapping from old column ids to new ones these
178 // values need to be replaced in the page headers.
180  const boost::filesystem::path& path,
181  const std::unordered_map<int, int>& column_ids_map,
182  const int32_t table_epoch) {
183  const std::string file_path = path.string();
184  const std::string file_name = path.filename().string();
185  std::vector<std::string> tokens;
186  boost::split(tokens, file_name, boost::is_any_of("."));
187 
188  // ref. FileMgr::init for hint of data file name layout
189  if (tokens.size() <= 2 || !(DATA_FILE_EXT == "." + tokens[2] || tokens[2] == "mapd")) {
190  // We are only interested in files in the form <id>.<page_size>.<DATA_FILE_EXT>
191  return;
192  }
193 
194  const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
195  const auto file_size = boost::filesystem::file_size(file_path);
196  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
197  std::fopen(file_path.c_str(), "r+"), simple_file_closer);
198  if (!fp) {
199  throw std::runtime_error("Failed to open " + file_path +
200  " for update: " + std::strerror(errno));
201  }
202  // TODO(Misiu): Rather than reference an exernal layout we should de-duplicate this
203  // page-reading code in a single location. This will also reduce the need for comments
204  // below.
205  // ref. FileInfo::openExistingFile for hint of chunk header layout
206  for (size_t page = 0; page < file_size / page_size; ++page) {
207  int32_t header_info[8];
208  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
209  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
210  file_path + " for read: " + std::strerror(errno));
211  }
212  if (1 != fread(header_info, sizeof header_info, 1, fp.get())) {
213  throw std::runtime_error("Failed to read " + file_path + ": " +
214  std::strerror(errno));
215  }
216  if (const auto header_size = header_info[0]; header_size > 0) {
217  // header_info[1] is the page's db_id; but can also be used as an "is deleted"
218  // indicator if negative.
219  auto& contingent = header_info[1];
220  // header_info[2] is the page's table_id; but can also used to store the page's
221  // epoch since the FileMgr stores table_id information separately.
222  auto& epoch = header_info[2];
223  auto& col_id = header_info[3];
225  table_epoch, epoch, contingent)) {
226  continue;
227  }
228  auto column_map_it = column_ids_map.find(col_id);
229  CHECK(column_map_it != column_ids_map.end()) << "could not find " << col_id;
230  // If a header contains a column id that is remapped to new location
231  // then write that change to the file.
232  if (const auto dest_col_id = column_map_it->second; col_id != dest_col_id) {
233  col_id = dest_col_id;
234  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
235  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
236  file_path + " for write: " + std::strerror(errno));
237  }
238  if (1 != fwrite(header_info, sizeof header_info, 1, fp.get())) {
239  throw std::runtime_error("Failed to write " + file_path + ": " +
240  std::strerror(errno));
241  }
242  }
243  }
244  }
245 }
246 
247 // Adjust column ids in chunk keys in a table's data files under a temp_data_dir,
248 // including files of all shards of the table. Can be slow for big files but should
249 // be scale faster than refragmentizing. Table altering should be rare for olap.
250 void adjust_altered_table_files(const int32_t table_epoch,
251  const std::string& temp_data_dir,
252  const std::unordered_map<int, int>& column_ids_map) {
253  boost::filesystem::path base_path(temp_data_dir);
254  boost::filesystem::recursive_directory_iterator end_it;
256  for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
257  ++fit) {
258  if (!boost::filesystem::is_symlink(fit->path()) &&
259  boost::filesystem::is_regular_file(fit->status())) {
260  thread_controller.startThread(
261  rewrite_column_ids_in_page_headers, fit->path(), column_ids_map, table_epoch);
262  thread_controller.checkThreadsStatus();
263  }
264  }
265  thread_controller.finish();
266 }
267 
268 void delete_old_symlinks(const std::string& table_data_dir) {
269  std::vector<boost::filesystem::path> symlinks;
270  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
271  it++) {
272  if (boost::filesystem::is_symlink(it->path())) {
273  symlinks.emplace_back(it->path());
274  }
275  }
276  for (const auto& symlink : symlinks) {
277  boost::filesystem::remove_all(symlink);
278  }
279 }
280 
281 void add_data_file_symlinks(const std::string& table_data_dir) {
282  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
283  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
284  it++) {
285  const auto path = boost::filesystem::canonical(it->path());
286  if (path.extension().string() == DATA_FILE_EXT) {
287  auto old_path = path;
288  old_path.replace_extension(File_Namespace::kLegacyDataFileExtension);
289  // Add a symlink to data file, if one does not exist.
290  if (!boost::filesystem::exists(old_path)) {
291  old_to_new_paths[old_path] = path;
292  }
293  }
294  }
295  for (const auto& [old_path, new_path] : old_to_new_paths) {
296  boost::filesystem::create_symlink(new_path.filename(), old_path);
297  }
298 }
299 
301  const std::string& temp_data_dir,
302  const std::vector<std::string>& target_paths,
303  const std::string& name_prefix) {
304  boost::filesystem::path base_path(temp_data_dir);
305  boost::filesystem::directory_iterator end_it;
306  int target_path_index = 0;
307  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
308  if (!boost::filesystem::is_regular_file(fit->status())) {
309  const std::string file_path = fit->path().string();
310  const std::string file_name = fit->path().filename().string();
311  if (boost::istarts_with(file_name, name_prefix)) {
312  const std::string target_path =
313  abs_path(global_file_mgr) + "/" + target_paths[target_path_index++];
314  if (std::rename(file_path.c_str(), target_path.c_str())) {
315  throw std::runtime_error("Failed to rename file " + file_path + " to " +
316  target_path + ": " + std::strerror(errno));
317  }
318  // Delete any old/invalid symlinks contained in table dump.
319  delete_old_symlinks(target_path);
321  // For post-rebrand table dumps, symlinks need to be added here, since file mgr
322  // migration would already have been executed for the dumped table.
323  add_data_file_symlinks(target_path);
324  }
325  }
326  }
327 }
328 
329 } // namespace
330 
332  const std::string& archive_path,
333  const std::string& compression) {
334  if (td->is_system_table) {
335  throw std::runtime_error("Dumping a system table is not supported.");
336  }
339  if (g_cluster) {
340  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
341  }
342  if (boost::filesystem::exists(archive_path)) {
343  throw std::runtime_error("Archive " + archive_path + " already exists.");
344  }
346  throw std::runtime_error("Dumping view or temporary table is not supported.");
347  }
348  // create a unique uuid for this table dump
349  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
350 
351  // collect paths of files to archive
352  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
353  std::vector<std::string> file_paths;
354  auto file_writer = [&file_paths, uuid](const std::string& file_name,
355  const std::string& file_type,
356  const std::string& file_data) {
357  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
358  std::fopen(file_name.c_str(), "w"), simple_file_closer);
359  if (!fp) {
360  throw std::runtime_error("Failed to create " + file_type + " file '" + file_name +
361  "': " + std::strerror(errno));
362  }
363  if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
364  throw std::runtime_error("Failed to write " + file_type + " file '" + file_name +
365  "': " + std::strerror(errno));
366  }
367  file_paths.push_back(uuid / std::filesystem::path(file_name).filename());
368  };
369 
370  const auto file_mgr_dir = std::filesystem::path(abs_path(global_file_mgr));
371  const auto uuid_dir = file_mgr_dir / uuid;
372 
373  if (!std::filesystem::create_directory(uuid_dir)) {
374  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
375  "' while dumping table.");
376  }
377 
378  ScopeGuard cleanup_guard = [&] {
379  if (std::filesystem::exists(uuid_dir)) {
380  std::filesystem::remove_all(uuid_dir);
381  }
382  };
383 
384  const auto table_name = td->tableName;
385  {
386  // - gen schema file
387  const auto schema_str = cat_->dumpSchema(td);
388  file_writer(uuid_dir / table_schema_filename, "table schema", schema_str);
389  // - gen column-old-info file
390  const auto cds = cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
391  std::vector<std::string> column_oldinfo;
392  std::transform(cds.begin(),
393  cds.end(),
394  std::back_inserter(column_oldinfo),
395  [&](const auto cd) -> std::string {
396  return cd->columnName + ":" + std::to_string(cd->columnId) + ":" +
398  });
399  const auto column_oldinfo_str = boost::algorithm::join(column_oldinfo, " ");
400  file_writer(uuid_dir / table_oldinfo_filename, "table old info", column_oldinfo_str);
401  // - gen table epoch
402  const auto epoch = cat_->getTableEpoch(cat_->getCurrentDB().dbId, td->tableId);
403  file_writer(uuid_dir / table_epoch_filename, "table epoch", std::to_string(epoch));
404  // - collect table data file paths ...
405  const auto data_file_dirs = cat_->getTableDataDirectories(td);
406  file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
407  // - collect table dict file paths ...
408  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
409  file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
410  // tar takes time. release cat lock to yield the cat to concurrent CREATE statements.
411  }
412  // run tar to archive the files ... this may take a while !!
413  run("tar " + compression + " --transform=s|" + uuid +
414  std::filesystem::path::preferred_separator + "|| -cvf " +
415  get_quoted_string(archive_path) + " " + boost::algorithm::join(file_paths, " "),
416  file_mgr_dir);
417 }
418 
419 // Restore data and dict files of a table from a tgz archive.
421  const TableDescriptor* td,
422  const std::string& archive_path,
423  const std::string& compression) {
426  if (g_cluster) {
427  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
428  }
429  if (!boost::filesystem::exists(archive_path)) {
430  throw std::runtime_error("Archive " + archive_path + " does not exist.");
431  }
433  throw std::runtime_error("Restoring view or temporary table is not supported.");
434  }
435  // Obtain table schema read lock to prevent modification of the schema during
436  // restoration
437  const auto table_read_lock =
439  // prevent concurrent inserts into table during restoration
440  const auto insert_data_lock =
442 
443  // untar takes time. no grab of cat lock to yield to concurrent CREATE stmts.
444  const auto global_file_mgr = cat_->getDataMgr().getGlobalFileMgr();
445 
446  // create a unique uuid for this table restore
447  std::string uuid = boost::uuids::to_string(boost::uuids::random_generator()());
448 
449  const auto uuid_dir = std::filesystem::path(abs_path(global_file_mgr)) / uuid;
450 
451  if (!std::filesystem::create_directory(uuid_dir)) {
452  throw std::runtime_error("Failed to create work directory '" + uuid_dir.string() +
453  "' while restoring table.");
454  }
455 
456  ScopeGuard cleanup_guard = [&] {
457  if (std::filesystem::exists(uuid_dir)) {
458  std::filesystem::remove_all(uuid_dir);
459  }
460  };
461 
462  // dirs where src files are untarred and dst files are backed up
463  constexpr static const auto temp_data_basename = "_data";
464  constexpr static const auto temp_back_basename = "_back";
465  const auto temp_data_dir = uuid_dir / temp_data_basename;
466  const auto temp_back_dir = uuid_dir / temp_back_basename;
467 
468  // extract & parse schema
469  const auto schema_str = get_table_schema(archive_path, td->tableName, compression);
470  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
471  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
472  CHECK(create_table_stmt);
473 
474  // verify compatibility between source and destination schemas
475  TableDescriptor src_td;
476  std::list<ColumnDescriptor> src_columns;
477  std::vector<Parser::SharedDictionaryDef> shared_dict_defs;
478  create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
479  // - sanity check table-level compatibility
480  if (src_td.hasDeletedCol != td->hasDeletedCol) {
481  // TODO: allow the case, in which src data enables vacuum while
482  // dst doesn't, by simply discarding src $deleted column data.
483  throw std::runtime_error("Incompatible table VACCUM option");
484  }
485  if (src_td.nShards != td->nShards) {
486  // TODO: allow different shard numbers if they have a "GCD",
487  // by splitting/merging src data files before drop into dst.
488  throw std::runtime_error("Unmatched number of table shards");
489  }
490  // - sanity check column-level compatibility (based on column names)
491  const auto dst_columns =
492  cat_->getAllColumnMetadataForTable(td->tableId, false, false, false);
493  if (dst_columns.size() != src_columns.size()) {
494  throw std::runtime_error("Unmatched number of table columns");
495  }
496  for (const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
497  if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
498  src_cd.columnType.get_compression_name() !=
499  dst_cd->columnType.get_compression_name()) {
500  throw std::runtime_error("Incompatible types on column " + src_cd.columnName);
501  }
502  }
503  // extract src table column ids (ALL columns incl. system/virtual/phy geo cols)
504  const auto all_src_oldinfo_str =
505  simple_file_cat(archive_path, table_oldinfo_filename, compression);
506  std::vector<std::string> src_oldinfo_strs;
507  boost::algorithm::split(src_oldinfo_strs,
508  all_src_oldinfo_str,
509  boost::is_any_of(" "),
510  boost::token_compress_on);
511  auto all_dst_columns =
512  cat_->getAllColumnMetadataForTable(td->tableId, true, true, true);
513  if (src_oldinfo_strs.size() != all_dst_columns.size()) {
514  throw std::runtime_error("Source table has a unmatched number of columns: " +
515  std::to_string(src_oldinfo_strs.size()) + " vs " +
516  std::to_string(all_dst_columns.size()));
517  }
518  // build a map of src column ids and dst column ids, just in case src table has been
519  // ALTERed before and chunk keys of src table needs to be adjusted accordingly.
520  // note: this map is used only for the case of migrating a table and not for restoring
521  // a table. When restoring a table, the two tables must have the same column ids.
522  //
523  // also build a map of src dict paths and dst dict paths for relocating src dicts
524  std::unordered_map<int, int> column_ids_map;
525  std::unordered_map<std::string, std::string> dict_paths_map;
526  // sort inputs of transform in lexical order of column names for correct mappings
527  std::list<std::vector<std::string>> src_oldinfo_tokens;
529  src_oldinfo_strs.begin(),
530  src_oldinfo_strs.end(),
531  std::back_inserter(src_oldinfo_tokens),
532  [](const auto& src_oldinfo_str) -> auto {
533  std::vector<std::string> tokens;
535  tokens, src_oldinfo_str, boost::is_any_of(":"), boost::token_compress_on);
536  return tokens;
537  });
538  src_oldinfo_tokens.sort(
539  [](const auto& lhs, const auto& rhs) { return lhs[0].compare(rhs[0]) < 0; });
540  all_dst_columns.sort(
541  [](auto a, auto b) { return a->columnName.compare(b->columnName) < 0; });
542  // transform inputs into the maps
543  std::transform(src_oldinfo_tokens.begin(),
544  src_oldinfo_tokens.end(),
545  all_dst_columns.begin(),
546  std::inserter(column_ids_map, column_ids_map.end()),
547  [&](const auto& tokens, const auto& cd) -> std::pair<int, int> {
548  VLOG(3) << boost::algorithm::join(tokens, ":") << " ==> "
549  << cd->columnName << ":" << cd->columnId;
550  dict_paths_map[tokens[2]] = cat_->getColumnDictDirectory(cd);
551  return {boost::lexical_cast<int>(tokens[1]), cd->columnId};
552  });
553  bool was_table_altered = false;
554  std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](auto& it) {
555  was_table_altered = was_table_altered || it.first != it.second;
556  });
557  VLOG(3) << "was_table_altered = " << was_table_altered;
558  // extract all data files to a temp dir. will swap with dst table dir after all set,
559  // otherwise will corrupt table in case any bad thing happens in the middle.
560  run("rm -rf " + temp_data_dir.string());
561  run("mkdir -p " + temp_data_dir.string());
562  run("tar " + compression + " -xvf " + get_quoted_string(archive_path), temp_data_dir);
563 
564  // if table was ever altered after it was created, update column ids in chunk headers.
565  if (was_table_altered) {
566  const auto epoch = boost::lexical_cast<int32_t>(
567  simple_file_cat(archive_path, table_epoch_filename, compression));
568  const auto time_ms = measure<>::execution(
569  [&]() { adjust_altered_table_files(epoch, temp_data_dir, column_ids_map); });
570  VLOG(3) << "adjust_altered_table_files: " << time_ms << " ms";
571  }
572  // finally,,, swap table data/dict dirs!
573  const auto data_file_dirs = cat_->getTableDataDirectories(td);
574  const auto dict_file_dirs = cat_->getTableDictDirectories(td);
575  // move current target dirs, if exists, to backup dir
576  std::vector<std::string> both_file_dirs;
577  std::merge(data_file_dirs.begin(),
578  data_file_dirs.end(),
579  dict_file_dirs.begin(),
580  dict_file_dirs.end(),
581  std::back_inserter(both_file_dirs));
582  bool backup_completed = false;
583  try {
584  run("rm -rf " + temp_back_dir.string());
585  run("mkdir -p " + temp_back_dir.string());
586  for (const auto& dir : both_file_dirs) {
587  const auto dir_full_path = abs_path(global_file_mgr) + "/" + dir;
588  if (boost::filesystem::is_directory(dir_full_path)) {
589  run("mv " + dir_full_path + " " + temp_back_dir.string());
590  }
591  }
592  backup_completed = true;
593  // Move table directories from temp dir to main data directory.
594  rename_table_directories(global_file_mgr, temp_data_dir, data_file_dirs, "table_");
595  // Move dictionaries from temp dir to main dir.
596  for (const auto& dit : dict_paths_map) {
597  if (!dit.first.empty() && !dit.second.empty()) {
598  const auto src_dict_path = temp_data_dir.string() + "/" + dit.first;
599  const auto dst_dict_path = abs_path(global_file_mgr) + "/" + dit.second;
600  run("mv " + src_dict_path + " " + dst_dict_path);
601  }
602  }
603  // throw if sanity test forces a rollback
605  throw std::runtime_error("lol!");
606  }
607  } catch (...) {
608  // once backup is completed, whatever in abs_path(global_file_mgr) is the "src"
609  // dirs that are to be rolled back and discarded
610  if (backup_completed) {
611  run("rm -rf " + boost::algorithm::join(both_file_dirs, " "),
612  abs_path(global_file_mgr));
613  }
614  // complete rollback by recovering original "dst" table dirs from backup dir
615  boost::filesystem::path base_path(temp_back_dir);
616  boost::filesystem::directory_iterator end_it;
617  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
618  run("mv " + fit->path().string() + " .", abs_path(global_file_mgr));
619  }
620  throw;
621  }
622  // set for reloading table from the restored/migrated files
623  const auto epoch = simple_file_cat(archive_path, table_epoch_filename, compression);
625  cat_->getCurrentDB().dbId, td->tableId, boost::lexical_cast<int>(epoch));
626 }
627 
628 // Migrate a table, which doesn't exist in current db, from a tar ball to the db.
629 // This actually creates the table and restores data/dict files from the tar ball.
631  const std::string& table_name,
632  const std::string& archive_path,
633  const std::string& compression) {
634  // replace table name and drop foreign dict references
635  const auto schema_str = get_table_schema(archive_path, table_name, compression);
636  std::unique_ptr<Parser::Stmt> stmt = Parser::create_stmt_for_query(schema_str, session);
637  const auto create_table_stmt = dynamic_cast<Parser::CreateTableStmt*>(stmt.get());
638  CHECK(create_table_stmt);
639  create_table_stmt->execute(session, false /*read-only*/);
640 
641  try {
642  restoreTable(
643  session, cat_->getMetadataForTable(table_name), archive_path, compression);
644  } catch (...) {
645  const auto schema_str = "DROP TABLE IF EXISTS " + table_name + ";";
646  std::unique_ptr<Parser::Stmt> stmt =
647  Parser::create_stmt_for_query(schema_str, session);
648  const auto drop_table_stmt = dynamic_cast<Parser::DropTableStmt*>(stmt.get());
649  CHECK(drop_table_stmt);
650  drop_table_stmt->execute(session, false /*read-only*/);
651 
652  throw;
653  }
654 }
std::string to_lower(const std::string &str)
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
void delete_old_symlinks(const std::string &table_data_dir)
std::string getBasePath() const
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
std::string getColumnDictDirectory(const ColumnDescriptor *cd, bool file_name_only=true) const
Definition: Catalog.cpp:5031
static constexpr char const * table_schema_filename
std::string tableName
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void rewrite_column_ids_in_page_headers(const boost::filesystem::path &path, const std::unordered_map< int, int > &column_ids_map, const int32_t table_epoch)
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:249
#define LOG(tag)
Definition: Logger.h:283
void startThread(FuncType &&func, Args &&...args)
std::string join(T const &container, std::string const &delim)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * table_oldinfo_filename
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3368
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5017
void restoreTable(const Catalog_Namespace::SessionInfo &session, const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
constexpr double a
Definition: Utm.h:32
Classes representing a parse tree.
std::string get_quoted_string(const std::string &filename, char quote, char escape)
Quote a string while escaping any existing quotes in the string.
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:248
void rename_table_directories(const File_Namespace::GlobalFileMgr *global_file_mgr, const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix)
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:320
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:606
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:785
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:3436
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
Definition: Catalog.cpp:5052
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:238
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:271
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2254
Data_Namespace::MemoryLevel persistenceLevel
void adjust_altered_table_files(const int32_t table_epoch, const std::string &temp_data_dir, const std::unordered_map< int, int > &column_ids_map)
static constexpr char const * table_epoch_filename
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1125
#define CHECK(condition)
Definition: Logger.h:289
bool g_cluster
void add_data_file_symlinks(const std::string &table_data_dir)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static bool run
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression)
std::string dumpSchema(const TableDescriptor *td) const
Definition: Catalog.cpp:5083
int cpu_threads()
Definition: thread_count.h:25
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void dumpTable(const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
A selection of helper methods for File I/O.
#define VLOG(n)
Definition: Logger.h:383
bool g_test_rollback_dump_restore
Catalog_Namespace::Catalog * cat_
Definition: TableArchiver.h:43