OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DumpRestore.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <cerrno>
18 #include <cstdio>
19 #include <cstring>
20 #include <exception>
21 #include <list>
22 #include <memory>
23 #include <regex>
24 #include <set>
25 #include <sstream>
26 #include <system_error>
27 
28 #include <boost/algorithm/string/join.hpp>
29 #include <boost/algorithm/string/predicate.hpp>
30 #include <boost/filesystem.hpp>
31 #include <boost/process.hpp>
32 #include <boost/range/combine.hpp>
33 
34 #include "Catalog.h"
36 #include "LockMgr/LockMgr.h"
37 #include "LockMgr/TableLockMgr.h"
38 #include "Parser/ParseDDL.h"
39 #include "Parser/ParserNode.h"
40 #include "Parser/parser.h"
41 #include "RWLocks.h"
42 #include "Shared/File.h"
43 #include "Shared/measure.h"
44 #include "Shared/thread_count.h"
47 #include "SysCatalog.h"
48 
49 extern bool g_cluster;
51 
52 namespace std {
53 template <typename T, typename U>
54 struct tuple_size<boost::tuples::cons<T, U>>
55  : boost::tuples::length<boost::tuples::cons<T, U>> {};
56 template <size_t I, typename T, typename U>
57 struct tuple_element<I, boost::tuples::cons<T, U>>
58  : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
59 } // namespace std
60 
61 namespace Catalog_Namespace {
62 
63 using cat_read_lock = read_lock<Catalog>;
64 
65 constexpr static char const* table_schema_filename = "_table.sql";
66 constexpr static char const* table_oldinfo_filename = "_table.oldinfo";
67 constexpr static char const* table_epoch_filename = "_table.epoch";
68 
69 inline auto simple_file_closer = [](FILE* f) { std::fclose(f); };
70 
71 inline std::string abs_path(const File_Namespace::GlobalFileMgr* global_file_mgr) {
72  return boost::filesystem::canonical(global_file_mgr->getBasePath()).string();
73 }
74 
75 inline std::string run(const std::string& cmd, const std::string& chdir = "") {
76  VLOG(3) << "running cmd: " << cmd;
77  int rcode;
78  std::error_code ec;
79  std::string output, errors;
80  const auto time_ms = measure<>::execution([&]() {
81  using namespace boost::process;
82  ipstream stdout, stderr;
83  if (!chdir.empty()) {
84  rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
85  } else {
86  rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
87  }
88  std::ostringstream ss_output, ss_errors;
89  stdout >> ss_output.rdbuf();
90  stderr >> ss_errors.rdbuf();
91  output = ss_output.str();
92  errors = ss_errors.str();
93  });
94  if (rcode || ec) {
95  LOG(ERROR) << "failed cmd: " << cmd;
96  LOG(ERROR) << "exit code: " << rcode;
97  LOG(ERROR) << "error code: " << ec.value() << " - " << ec.message();
98  LOG(ERROR) << "stdout: " << output;
99  LOG(ERROR) << "stderr: " << errors;
100 #if defined(__APPLE__)
101  // osx bsdtar options "--use-compress-program" and "--fast-read" together
102  // run into pipe write error after tar extracts the first occurrence of a
103  // file and closes the read end while the decompression program still writes
104  // to the pipe. bsdtar doesn't handle this situation well like gnu tar does.
105  if (1 == rcode && cmd.find("--fast-read") &&
106  (errors.find("cannot write decoded block") != std::string::npos ||
107  errors.find("Broken pipe") != std::string::npos)) {
108  // ignore this error, or lose speed advantage of "--fast-read" on osx.
109  LOG(ERROR) << "tar error ignored on osx for --fast-read";
110  } else
111 #endif
112  // circumvent tar warning on reading file that is "changed as we read it".
113  // this warning results from reading a table file under concurrent inserts
114  if (1 == rcode && errors.find("changed as we read") != std::string::npos) {
115  LOG(ERROR) << "tar error ignored under concurrent inserts";
116  } else {
117  throw std::runtime_error("Failed to run command: " + cmd +
118  "\nexit code: " + std::to_string(rcode) + "\nerrors:\n" +
119  (rcode ? errors : ec.message()));
120  }
121  } else {
122  VLOG(3) << "finished cmd: " << cmd;
123  VLOG(3) << "time: " << time_ms << " ms";
124  VLOG(3) << "stdout: " << output;
125  }
126  return output;
127 }
128 
129 inline std::string simple_file_cat(const std::string& archive_path,
130  const std::string& file_name,
131  const std::string& compression) {
132 #if defined(__APPLE__)
133  constexpr static auto opt_occurrence = " --fast-read ";
134 #else
135  constexpr static auto opt_occurrence = " --occurrence=1 ";
136 #endif
137  boost::filesystem::path temp_dir =
138  boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
139  boost::filesystem::create_directories(temp_dir);
140  run("tar " + compression + " -xvf \"" + archive_path + "\" " + opt_occurrence +
141  file_name,
142  temp_dir.string());
143  const auto output = run("cat " + (temp_dir / file_name).string());
144  boost::filesystem::remove_all(temp_dir);
145  return output;
146 }
147 
148 inline std::string get_table_schema(const std::string& archive_path,
149  const std::string& table,
150  const std::string& compression) {
151  const auto schema_str =
152  simple_file_cat(archive_path, table_schema_filename, compression);
153  std::regex regex("@T");
154  return std::regex_replace(schema_str, regex, table);
155 }
156 
157 // get a table's data dirs
158 std::vector<std::string> Catalog::getTableDataDirectories(
159  const TableDescriptor* td) const {
160  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
161  std::vector<std::string> file_paths;
162  for (auto shard : getPhysicalTablesDescriptors(td)) {
163  const auto file_mgr = global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId);
164  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
165  file_paths.push_back(file_path.filename().string());
166  }
167  return file_paths;
168 }
169 
170 // get a column's dict dir basename
171 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd) const {
172  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
174  cd->columnType.get_comp_param() > 0) {
175  const auto dictId = cd->columnType.get_comp_param();
176  const DictRef dictRef(currentDB_.dbId, dictId);
177  const auto dit = dictDescriptorMapByRef_.find(dictRef);
178  CHECK(dit != dictDescriptorMapByRef_.end());
179  CHECK(dit->second);
180  boost::filesystem::path file_path(dit->second->dictFolderPath);
181  return file_path.filename().string();
182  }
183  return std::string();
184 }
185 
186 // get a table's dict dirs
187 std::vector<std::string> Catalog::getTableDictDirectories(
188  const TableDescriptor* td) const {
189  std::vector<std::string> file_paths;
190  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
191  auto file_base = getColumnDictDirectory(cd);
192  if (!file_base.empty() &&
193  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
194  file_paths.push_back(file_base);
195  }
196  }
197  return file_paths;
198 }
199 
200 // dump a table's schema, data files and dict files to a tgz ball
202  const std::string& archive_path,
203  const std::string& compression) const {
204  if (g_cluster) {
205  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
206  }
207  if (boost::filesystem::exists(archive_path)) {
208  throw std::runtime_error("Archive " + archive_path + " already exists.");
209  }
211  throw std::runtime_error("Dumping view or temporary table is not supported.");
212  }
213  // collect paths of files to archive
214  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
215  std::vector<std::string> file_paths;
216  auto file_writer = [&file_paths, global_file_mgr](const std::string& file_name,
217  const std::string& file_type,
218  const std::string& file_data) {
219  const auto file_path = abs_path(global_file_mgr) + "/" + file_name;
220  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
221  std::fopen(file_path.c_str(), "w"), simple_file_closer);
222  if (!fp) {
223  throw std::runtime_error("Failed to create " + file_type + " file '" + file_path +
224  "': " + std::strerror(errno));
225  }
226  if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
227  throw std::runtime_error("Failed to write " + file_type + " file '" + file_path +
228  "': " + std::strerror(errno));
229  }
230  file_paths.push_back(file_name);
231  };
232  // grab table read lock for concurrent SELECT (but would block capped COPY or INSERT)
233  auto table_read_lock =
235 
236  // grab catalog read lock only. no need table read or checkpoint lock
237  // because want to allow concurrent inserts while this dump proceeds.
238  const auto table_name = td->tableName;
239  {
240  cat_read_lock read_lock(this);
241  // - gen schema file
242  const auto schema_str = dumpSchema(td);
243  file_writer(table_schema_filename, "table schema", schema_str);
244  // - gen column-old-info file
245  const auto cds = getAllColumnMetadataForTable(td->tableId, true, true, true);
246  std::vector<std::string> column_oldinfo;
247  std::transform(cds.begin(),
248  cds.end(),
249  std::back_inserter(column_oldinfo),
250  [&](const auto cd) -> std::string {
251  return cd->columnName + ":" + std::to_string(cd->columnId) + ":" +
253  });
254  const auto column_oldinfo_str = boost::algorithm::join(column_oldinfo, " ");
255  file_writer(table_oldinfo_filename, "table old info", column_oldinfo_str);
256  // - gen table epoch
257  const auto epoch = getTableEpoch(currentDB_.dbId, td->tableId);
258  file_writer(table_epoch_filename, "table epoch", std::to_string(epoch));
259  // - collect table data file paths ...
260  const auto data_file_dirs = getTableDataDirectories(td);
261  file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
262  // - collect table dict file paths ...
263  const auto dict_file_dirs = getTableDictDirectories(td);
264  file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
265  // tar takes time. release cat lock to yield the cat to concurrent CREATE statements.
266  }
267  // run tar to archive the files ... this may take a while !!
268  run("tar " + compression + " -cvf \"" + archive_path + "\" " +
269  boost::algorithm::join(file_paths, " "),
270  abs_path(global_file_mgr));
271 }
272 
273 // returns table schema in a string
274 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
275  std::ostringstream os;
276  os << "CREATE TABLE @T (";
277  // gather column defines
278  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
279  std::string comma;
280  std::vector<std::string> shared_dicts;
281  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
282  for (const auto cd : cds) {
283  if (!(cd->isSystemCol || cd->isVirtualCol)) {
284  const auto& ti = cd->columnType;
285  os << comma << cd->columnName;
286  // CHAR is perculiar... better dump it as TEXT(32) like \d does
287  if (ti.get_type() == SQLTypes::kCHAR) {
288  os << " "
289  << "TEXT";
290  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
291  os << " "
292  << "TEXT[]";
293  } else {
294  os << " " << ti.get_type_name();
295  }
296  os << (ti.get_notnull() ? " NOT NULL" : "");
297  if (ti.is_string()) {
298  if (ti.get_compression() == kENCODING_DICT) {
299  // if foreign reference, get referenced tab.col
300  const auto dict_id = ti.get_comp_param();
301  const DictRef dict_ref(currentDB_.dbId, dict_id);
302  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
303  CHECK(dict_it != dictDescriptorMapByRef_.end());
304  const auto dict_name = dict_it->second->dictName;
305  // when migrating a table, any foreign dict ref will be dropped
306  // and the first cd of a dict will become root of the dict
307  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
308  dict_root_cds[dict_name] = cd;
309  os << " ENCODING " << ti.get_compression_name() << "(" << (ti.get_size() * 8)
310  << ")";
311  } else {
312  const auto dict_root_cd = dict_root_cds[dict_name];
313  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
314  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
315  // "... shouldn't specify an encoding, it borrows from the referenced column"
316  }
317  } else {
318  os << " ENCODING NONE";
319  }
320  } else if (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size()) {
321  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
322  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
323  }
324  comma = ", ";
325  }
326  }
327  // gather SHARED DICTIONARYs
328  if (shared_dicts.size()) {
329  os << ", " << boost::algorithm::join(shared_dicts, ", ");
330  }
331  // gather WITH options ...
332  std::vector<std::string> with_options;
333  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
334  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
335  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
336  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
337  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
338  : "VACUUM='IMMEDIATE'");
339  if (!td->partitions.empty()) {
340  with_options.push_back("PARTITIONS='" + td->partitions + "'");
341  }
342  if (td->nShards > 0) {
343  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
344  CHECK(shard_cd);
345  os << ", SHARD KEY(" << shard_cd->columnName << ")";
346  with_options.push_back("SHARD_COUNT=" + std::to_string(td->nShards));
347  }
348  if (td->sortedColumnId > 0) {
349  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
350  CHECK(sort_cd);
351  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
352  }
353  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
354  return os.str();
355 }
356 
357 // Adjust column ids in chunk keys in a table's data files under a temp_data_dir,
358 // including files of all shards of the table. Can be slow for big files but should
359 // be scale faster than refragmentizing. Table altering should be rare for olap.
361  const std::string& temp_data_dir,
362  const std::unordered_map<int, int>& column_ids_map) const {
363  boost::filesystem::path base_path(temp_data_dir);
364  boost::filesystem::recursive_directory_iterator end_it;
366  for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
367  ++fit) {
368  if (boost::filesystem::is_regular_file(fit->status())) {
369  const std::string file_path = fit->path().string();
370  const std::string file_name = fit->path().filename().string();
371  std::vector<std::string> tokens;
372  boost::split(tokens, file_name, boost::is_any_of("."));
373  // ref. FileMgr::init for hint of data file name layout
374  if (tokens.size() > 2 && MAPD_FILE_EXT == "." + tokens[2]) {
375  thread_controller.startThread([file_name, file_path, tokens, &column_ids_map] {
376  const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
377  const auto file_size = boost::filesystem::file_size(file_path);
378  std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
379  std::fopen(file_path.c_str(), "r+"), simple_file_closer);
380  if (!fp) {
381  throw std::runtime_error("Failed to open " + file_path +
382  " for update: " + std::strerror(errno));
383  }
384  // ref. FileInfo::openExistingFile for hint of chunk header layout
385  for (size_t page = 0; page < file_size / page_size; ++page) {
386  int ints[8];
387  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
388  throw std::runtime_error("Failed to seek to page# " + std::to_string(page) +
389  file_path + " for read: " + std::strerror(errno));
390  }
391  if (1 != fread(ints, sizeof ints, 1, fp.get())) {
392  throw std::runtime_error("Failed to read " + file_path + ": " +
393  std::strerror(errno));
394  }
395  if (ints[0] > 0) { // header size
396  auto cit = column_ids_map.find(ints[3]);
397  CHECK(cit != column_ids_map.end());
398  if (ints[3] != cit->second) {
399  ints[3] = cit->second;
400  if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
401  throw std::runtime_error("Failed to seek to page# " +
402  std::to_string(page) + file_path +
403  " for write: " + std::strerror(errno));
404  }
405  if (1 != fwrite(ints, sizeof ints, 1, fp.get())) {
406  throw std::runtime_error("Failed to write " + file_path + ": " +
407  std::strerror(errno));
408  }
409  }
410  }
411  }
412  });
413  thread_controller.checkThreadsStatus();
414  }
415  }
416  }
417  thread_controller.finish();
418 }
419 
420 // Rename table data directories in temp_data_dir to those in target_paths.
421 // Note: It applies to table migration and not to table recovery.
422 void Catalog::renameTableDirectories(const std::string& temp_data_dir,
423  const std::vector<std::string>& target_paths,
424  const std::string& name_prefix) const {
425  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
426  boost::filesystem::path base_path(temp_data_dir);
427  boost::filesystem::directory_iterator end_it;
428  int target_path_index = 0;
429  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
430  if (!boost::filesystem::is_regular_file(fit->status())) {
431  const std::string file_path = fit->path().string();
432  const std::string file_name = fit->path().filename().string();
433  if (boost::istarts_with(file_name, name_prefix)) {
434  const std::string target_path =
435  abs_path(global_file_mgr) + "/" + target_paths[target_path_index++];
436  if (std::rename(file_path.c_str(), target_path.c_str())) {
437  throw std::runtime_error("Failed to rename file " + file_path + " to " +
438  target_path + ": " + std::strerror(errno));
439  }
440  }
441  }
442  }
443 }
444 
445 // Restore data and dict files of a table from a tgz ball.
446 void Catalog::restoreTable(const SessionInfo& session,
447  const TableDescriptor* td,
448  const std::string& archive_path,
449  const std::string& compression) {
450  if (g_cluster) {
451  throw std::runtime_error("DUMP/RESTORE is not supported yet on distributed setup.");
452  }
453  if (!boost::filesystem::exists(archive_path)) {
454  throw std::runtime_error("Archive " + archive_path + " does not exist.");
455  }
457  throw std::runtime_error("Restoring view or temporary table is not supported.");
458  }
459  // should get checkpoint lock to block any data injection which is meaningless
460  // once after the table is restored from data in the source table files.
461  auto checkpoint_lock =
462  Lock_Namespace::getTableLock<mapd_shared_mutex, mapd_unique_lock>(
464  // grab table read lock for concurrent SELECT
465  auto table_read_lock =
467  // untar takes time. no grab of cat lock to yield to concurrent CREATE stmts.
468  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
469  // dirs where src files are untarred and dst files are backed up
470  constexpr static const auto temp_data_basename = "_data";
471  constexpr static const auto temp_back_basename = "_back";
472  const auto temp_data_dir = abs_path(global_file_mgr) + "/" + temp_data_basename;
473  const auto temp_back_dir = abs_path(global_file_mgr) + "/" + temp_back_basename;
474  // clean up tmp dirs and files in any case
475  auto tmp_files_cleaner = [&](void*) {
476  run("rm -rf " + temp_data_dir + " " + temp_back_dir);
477  run("rm -f " + abs_path(global_file_mgr) + "/" + table_schema_filename);
478  run("rm -f " + abs_path(global_file_mgr) + "/" + table_oldinfo_filename);
479  run("rm -f " + abs_path(global_file_mgr) + "/" + table_epoch_filename);
480  };
481  std::unique_ptr<decltype(tmp_files_cleaner), decltype(tmp_files_cleaner)> tfc(
482  &tmp_files_cleaner, tmp_files_cleaner);
483  // extract & parse schema
484  const auto schema_str = get_table_schema(archive_path, td->tableName, compression);
485  const auto create_table_stmt =
486  Parser::parseDDL<Parser::CreateTableStmt>("table schema", schema_str);
487  // verify compatibility between source and destination schemas
488  TableDescriptor src_td;
489  std::list<ColumnDescriptor> src_columns;
490  std::vector<SharedDictionaryDef> shared_dict_defs;
491  create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
492  // - sanity check table-level compatibility
493  if (src_td.hasDeletedCol != td->hasDeletedCol) {
494  // TODO: allow the case, in which src data enables vacuum while
495  // dst doesn't, by simply discarding src $deleted column data.
496  throw std::runtime_error("Incompatible table VACCUM option");
497  }
498  if (src_td.nShards != td->nShards) {
499  // TODO: allow different shard numbers if they have a "GCD",
500  // by splitting/merging src data files before drop into dst.
501  throw std::runtime_error("Unmatched number of table shards");
502  }
503  // - sanity check column-level compatibility (based on column names)
504  const auto dst_columns = getAllColumnMetadataForTable(td->tableId, false, false, false);
505  if (dst_columns.size() != src_columns.size()) {
506  throw std::runtime_error("Unmatched number of table columns");
507  }
508  for (const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
509  if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
510  src_cd.columnType.get_compression_name() !=
511  dst_cd->columnType.get_compression_name()) {
512  throw std::runtime_error("Incompatible types on column " + src_cd.columnName);
513  }
514  }
515  // extract src table column ids (ALL columns incl. system/virtual/phy geo cols)
516  const auto all_src_oldinfo_str =
517  simple_file_cat(archive_path, table_oldinfo_filename, compression);
518  std::vector<std::string> src_oldinfo_strs;
519  boost::algorithm::split(src_oldinfo_strs,
520  all_src_oldinfo_str,
521  boost::is_any_of(" "),
522  boost::token_compress_on);
523  auto all_dst_columns = getAllColumnMetadataForTable(td->tableId, true, true, true);
524  if (src_oldinfo_strs.size() != all_dst_columns.size()) {
525  throw std::runtime_error("Source table has a unmatched number of columns: " +
526  std::to_string(src_oldinfo_strs.size()) + " vs " +
527  std::to_string(all_dst_columns.size()));
528  }
529  // build a map of src column ids and dst column ids, just in case src table has been
530  // ALTERed before and chunk keys of src table needs to be adjusted accordingly.
531  // note: this map is used only for the case of migrating a table and not for restoring
532  // a table. When restoring a table, the two tables must have the same column ids.
533  //
534  // also build a map of src dict paths and dst dict paths for relocating src dicts
535  std::unordered_map<int, int> column_ids_map;
536  std::unordered_map<std::string, std::string> dict_paths_map;
537  // sort inputs of transform in lexical order of column names for correct mappings
538  std::list<std::vector<std::string>> src_oldinfo_tokens;
539  std::transform(
540  src_oldinfo_strs.begin(),
541  src_oldinfo_strs.end(),
542  std::back_inserter(src_oldinfo_tokens),
543  [](const auto& src_oldinfo_str) -> auto {
544  std::vector<std::string> tokens;
546  tokens, src_oldinfo_str, boost::is_any_of(":"), boost::token_compress_on);
547  return tokens;
548  });
549  src_oldinfo_tokens.sort(
550  [](const auto& lhs, const auto& rhs) { return lhs[0].compare(rhs[0]) < 0; });
551  all_dst_columns.sort(
552  [](auto a, auto b) { return a->columnName.compare(b->columnName) < 0; });
553  // transform inputs into the maps
554  std::transform(src_oldinfo_tokens.begin(),
555  src_oldinfo_tokens.end(),
556  all_dst_columns.begin(),
557  std::inserter(column_ids_map, column_ids_map.end()),
558  [&](const auto& tokens, const auto& cd) -> std::pair<int, int> {
559  VLOG(3) << boost::algorithm::join(tokens, ":") << " ==> "
560  << cd->columnName << ":" << cd->columnId;
561  dict_paths_map[tokens[2]] = getColumnDictDirectory(cd);
562  return {boost::lexical_cast<int>(tokens[1]), cd->columnId};
563  });
564  bool was_table_altered = false;
565  std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](auto& it) {
566  was_table_altered = was_table_altered || it.first != it.second;
567  });
568  VLOG(3) << "was_table_altered = " << was_table_altered;
569  // extract all data files to a temp dir. will swap with dst table dir after all set,
570  // otherwise will corrupt table in case any bad thing happens in the middle.
571  run("rm -rf " + temp_data_dir);
572  run("mkdir -p " + temp_data_dir);
573  run("tar " + compression + " -xvf \"" + archive_path + "\"", temp_data_dir);
574  // if table was ever altered after it was created, update column ids in chunk headers.
575  if (was_table_altered) {
576  const auto time_ms = measure<>::execution(
577  [&]() { adjustAlteredTableFiles(temp_data_dir, column_ids_map); });
578  VLOG(3) << "adjustAlteredTableFiles: " << time_ms << " ms";
579  }
580  // finally,,, swap table data/dict dirs!
581  const auto data_file_dirs = getTableDataDirectories(td);
582  const auto dict_file_dirs = getTableDictDirectories(td);
583  // move current target dirs, if exists, to backup dir
584  std::vector<std::string> both_file_dirs;
585  std::merge(data_file_dirs.begin(),
586  data_file_dirs.end(),
587  dict_file_dirs.begin(),
588  dict_file_dirs.end(),
589  std::back_inserter(both_file_dirs));
590  bool backup_completed = false;
591  // protect table schema and quickly swap table files
592  cat_read_lock read_lock(this);
593  try {
594  run("rm -rf " + temp_back_dir);
595  run("mkdir -p " + temp_back_dir);
596  for (const auto& dir : both_file_dirs) {
597  const auto dir_full_path = abs_path(global_file_mgr) + "/" + dir;
598  if (boost::filesystem::is_directory(dir_full_path)) {
599  run("mv " + dir_full_path + " " + temp_back_dir);
600  }
601  }
602  backup_completed = true;
603  // accord src data dirs to dst
604  renameTableDirectories(temp_data_dir, data_file_dirs, "table_");
605  // accord src dict dirs to dst
606  for (const auto& dit : dict_paths_map) {
607  if (!dit.first.empty() && !dit.second.empty()) {
608  const auto src_dict_path = temp_data_dir + "/" + dit.first;
609  const auto dst_dict_path = abs_path(global_file_mgr) + "/" + dit.second;
610  run("mv " + src_dict_path + " " + dst_dict_path);
611  }
612  }
613  // throw if sanity test forces a rollback
615  throw std::runtime_error("lol!");
616  }
617  } catch (...) {
618  // once backup is completed, whatever in abs_path(global_file_mgr) is the "src"
619  // dirs that are to be rolled back and discarded
620  if (backup_completed) {
621  run("rm -rf " + boost::algorithm::join(both_file_dirs, " "),
622  abs_path(global_file_mgr));
623  }
624  // complete rollback by recovering original "dst" table dirs from backup dir
625  boost::filesystem::path base_path(temp_back_dir);
626  boost::filesystem::directory_iterator end_it;
627  for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
628  run("mv " + fit->path().string() + " .", abs_path(global_file_mgr));
629  }
630  throw;
631  }
632  // set for reloading table from the restored/migrated files
633  const auto epoch = simple_file_cat(archive_path, table_epoch_filename, compression);
634  setTableEpoch(currentDB_.dbId, td->tableId, boost::lexical_cast<int>(epoch));
635 }
636 
637 // Migrate a table, which doesn't exist in current db, from a tar ball to the db.
638 // This actually creates the table and restores data/dict files from the tar ball.
639 void Catalog::restoreTable(const SessionInfo& session,
640  const std::string& table_name,
641  const std::string& archive_path,
642  const std::string& compression) {
643  // replace table name and drop foreign dict references
644  const auto schema_str = get_table_schema(archive_path, table_name, compression);
645  Parser::parseDDL<Parser::CreateTableStmt>("table schema", schema_str)->execute(session);
646  try {
647  restoreTable(session, getMetadataForTable(table_name), archive_path, compression);
648  } catch (...) {
649  Parser::parseDDL<Parser::DropTableStmt>("statement",
650  "DROP TABLE IF EXISTS " + table_name + ";")
651  ->execute(session);
652  throw;
653  }
654 }
655 
656 } // namespace Catalog_Namespace
static ReadLock getReadLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: TableLockMgr.h:95
void renameTableDirectories(const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix) const
std::string partitions
std::string run(const std::string &cmd, const std::string &chdir="")
Definition: DumpRestore.cpp:75
std::string getBasePath() const
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
std::string tableName
bool g_cluster
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
#define LOG(tag)
Definition: Logger.h:185
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression)
void startThread(FuncType &&func, Args &&...args)
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
DictDescriptorMapById dictDescriptorMapByRef_
Definition: Catalog.h:322
FileMgr * getFileMgr(const int db_id, const int tb_id)
std::string join(T const &container, std::string const &delim)
#define MAPD_FILE_EXT
Definition: File.h:26
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2125
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
static constexpr char const * table_schema_filename
Definition: DumpRestore.cpp:65
std::string to_string(char const *&&v)
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
Definition: DumpRestore.cpp:71
std::string getColumnDictDirectory(const ColumnDescriptor *cd) const
This file contains the class specification and related data structures for Catalog.
void dumpTable(const TableDescriptor *td, const std::string &path, const std::string &compression) const
This file contains the class specification and related data structures for SysCatalog.
CHECK(cgen_state)
Classes representing a parse tree.
void adjustAlteredTableFiles(const std::string &temp_data_dir, const std::unordered_map< int, int > &all_column_ids_map) const
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
specifies the content in-memory of a row in the column metadata table
void restoreTable(const SessionInfo &session, const TableDescriptor *td, const std::string &file_path, const std::string &compression)
read_lock< Catalog > cat_read_lock
Definition: Catalog.cpp:90
static constexpr char const * table_oldinfo_filename
Definition: DumpRestore.cpp:66
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1581
static constexpr char const * table_epoch_filename
Definition: DumpRestore.cpp:67
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2155
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > split(const std::string &str, const std::string &delim)
split apart a string into a vector of substrings
Definition: sqltypes.h:44
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
bool is_string() const
Definition: sqltypes.h:477
bool g_test_rollback_dump_restore
Definition: DumpRestore.cpp:50
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
specifies the content in-memory of a row in the table metadata table
std::string dumpSchema(const TableDescriptor *td) const
int cpu_threads()
Definition: thread_count.h:25
A selection of helper methods for File I/O.
#define VLOG(n)
Definition: Logger.h:280
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:437
bool is_string_array() const
Definition: sqltypes.h:478