20 #include <boost/filesystem.hpp>
21 #include <boost/process.hpp>
22 #include <boost/range/combine.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
25 #include <boost/version.hpp>
35 #include <system_error>
61 #if BOOST_VERSION < 107300
64 template <
typename T,
typename U>
65 struct tuple_size<boost::tuples::cons<T, U>>
66 : boost::tuples::length<boost::tuples::cons<T, U>> {};
67 template <
size_t I,
typename T,
typename U>
68 struct tuple_element<I, boost::tuples::cons<T, U>>
69 : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
79 return boost::filesystem::canonical(global_file_mgr->
getBasePath()).
string();
82 inline std::string
run(
const std::string& cmd,
const std::string& chdir =
"") {
83 VLOG(3) <<
"running cmd: " << cmd;
86 std::string output, errors;
88 using namespace boost::process;
89 ipstream stdout, stderr;
91 rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
93 rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
95 std::ostringstream ss_output, ss_errors;
96 stdout >> ss_output.rdbuf();
97 stderr >> ss_errors.rdbuf();
98 output = ss_output.str();
99 errors = ss_errors.str();
102 LOG(
ERROR) <<
"failed cmd: " << cmd;
103 LOG(
ERROR) <<
"exit code: " << rcode;
104 LOG(
ERROR) <<
"error code: " << ec.value() <<
" - " << ec.message();
107 #if defined(__APPLE__)
112 if (1 == rcode && cmd.find(
"--fast-read") &&
113 (errors.find(
"cannot write decoded block") != std::string::npos ||
114 errors.find(
"Broken pipe") != std::string::npos)) {
116 LOG(
ERROR) <<
"tar error ignored on osx for --fast-read";
121 if (1 == rcode && errors.find(
"changed as we read") != std::string::npos) {
122 LOG(
ERROR) <<
"tar error ignored under concurrent inserts";
125 std::string error_message;
127 error_code = ec.value();
128 error_message = ec.message();
133 if (
to_lower(errors).find(
"permission denied") != std::string::npos) {
134 error_message =
"Insufficient file read/write permission.";
136 error_message = errors;
139 throw std::runtime_error(
140 "An error occurred while executing an internal command. Error code: " +
144 VLOG(3) <<
"finished cmd: " << cmd;
145 VLOG(3) <<
"time: " << time_ms <<
" ms";
146 VLOG(3) <<
"stdout: " << output;
152 const std::string& file_name,
153 const std::string& compression) {
156 #if defined(__APPLE__)
157 constexpr
static auto opt_occurrence =
"--fast-read";
159 constexpr
static auto opt_occurrence =
"--occurrence=1";
161 boost::filesystem::path temp_dir =
162 boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
163 boost::filesystem::create_directories(temp_dir);
165 opt_occurrence +
" " + file_name,
167 const auto output =
run(
"cat " + (temp_dir / file_name).
string());
168 boost::filesystem::remove_all(temp_dir);
173 const std::string& table,
174 const std::string& compression) {
175 const auto schema_str =
177 std::regex regex(
"@T");
178 return std::regex_replace(schema_str, regex, table);
184 const boost::filesystem::path& path,
185 const std::unordered_map<int, int>& column_ids_map,
186 const int32_t table_epoch) {
187 const std::string file_path = path.string();
188 const std::string file_name = path.filename().string();
189 std::vector<std::string> tokens;
193 if (tokens.size() <= 2 || !(
DATA_FILE_EXT ==
"." + tokens[2] || tokens[2] ==
"mapd")) {
198 const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
200 std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
203 throw std::runtime_error(
"Failed to open " + file_path +
204 " for update: " + std::strerror(errno));
210 for (
size_t page = 0; page < file_size / page_size; ++page) {
211 int32_t header_info[8];
212 if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
213 throw std::runtime_error(
"Failed to seek to page# " +
std::to_string(page) +
214 file_path +
" for read: " + std::strerror(errno));
216 if (1 != fread(header_info,
sizeof header_info, 1, fp.get())) {
217 throw std::runtime_error(
"Failed to read " + file_path +
": " +
218 std::strerror(errno));
220 if (
const auto header_size = header_info[0]; header_size > 0) {
223 auto& contingent = header_info[1];
226 auto& epoch = header_info[2];
227 auto& col_id = header_info[3];
229 table_epoch, epoch, contingent)) {
232 auto column_map_it = column_ids_map.find(col_id);
233 CHECK(column_map_it != column_ids_map.end()) <<
"could not find " << col_id;
236 if (
const auto dest_col_id = column_map_it->second; col_id != dest_col_id) {
237 col_id = dest_col_id;
238 if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
239 throw std::runtime_error(
"Failed to seek to page# " +
std::to_string(page) +
240 file_path +
" for write: " + std::strerror(errno));
242 if (1 != fwrite(header_info,
sizeof header_info, 1, fp.get())) {
243 throw std::runtime_error(
"Failed to write " + file_path +
": " +
244 std::strerror(errno));
255 const std::string& temp_data_dir,
256 const std::unordered_map<int, int>& column_ids_map) {
257 boost::filesystem::path base_path(temp_data_dir);
258 boost::filesystem::recursive_directory_iterator end_it;
260 for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
262 if (!boost::filesystem::is_symlink(fit->path()) &&
263 boost::filesystem::is_regular_file(fit->status())) {
269 thread_controller.
finish();
273 std::vector<boost::filesystem::path> symlinks;
274 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
276 if (boost::filesystem::is_symlink(it->path())) {
277 symlinks.emplace_back(it->path());
280 for (
const auto& symlink : symlinks) {
281 boost::filesystem::remove_all(symlink);
286 std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
287 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
289 const auto path = boost::filesystem::canonical(it->path());
291 auto old_path = path;
294 if (!boost::filesystem::exists(old_path)) {
295 old_to_new_paths[old_path] = path;
299 for (
const auto& [old_path, new_path] : old_to_new_paths) {
300 boost::filesystem::create_symlink(new_path.filename(), old_path);
305 const std::string& temp_data_dir,
306 const std::vector<std::string>& target_paths,
307 const std::string& name_prefix) {
308 boost::filesystem::path base_path(temp_data_dir);
309 boost::filesystem::directory_iterator end_it;
310 int target_path_index = 0;
311 for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
312 if (!boost::filesystem::is_regular_file(fit->status())) {
313 const std::string file_path = fit->path().string();
314 const std::string file_name = fit->path().filename().string();
315 if (boost::istarts_with(file_name, name_prefix)) {
316 const std::string target_path =
317 abs_path(global_file_mgr) +
"/" + target_paths[target_path_index++];
318 if (std::rename(file_path.c_str(), target_path.c_str())) {
319 throw std::runtime_error(
"Failed to rename file " + file_path +
" to " +
320 target_path +
": " + std::strerror(errno));
336 const std::string& archive_path,
337 const std::string& compression) {
339 throw std::runtime_error(
"Dumping a system table is not supported.");
344 throw std::runtime_error(
"DUMP/RESTORE is not supported yet on distributed setup.");
346 if (boost::filesystem::exists(archive_path)) {
347 throw std::runtime_error(
"Archive " + archive_path +
" already exists.");
350 throw std::runtime_error(
"Dumping view or temporary table is not supported.");
357 std::vector<std::string> file_paths;
358 auto file_writer = [&file_paths, uuid](
const std::string& file_name,
359 const std::string& file_type,
360 const std::string& file_data) {
361 std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
364 throw std::runtime_error(
"Failed to create " + file_type +
" file '" + file_name +
365 "': " + std::strerror(errno));
367 if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
368 throw std::runtime_error(
"Failed to write " + file_type +
" file '" + file_name +
369 "': " + std::strerror(errno));
371 file_paths.push_back(uuid / std::filesystem::path(file_name).
filename());
374 const auto file_mgr_dir = std::filesystem::path(
abs_path(global_file_mgr));
375 const auto uuid_dir = file_mgr_dir / uuid;
377 if (!std::filesystem::create_directory(uuid_dir)) {
378 throw std::runtime_error(
"Failed to create work directory '" + uuid_dir.string() +
379 "' while dumping table.");
383 if (std::filesystem::exists(uuid_dir)) {
384 std::filesystem::remove_all(uuid_dir);
395 std::vector<std::string> column_oldinfo;
398 std::back_inserter(column_oldinfo),
399 [&](
const auto cd) -> std::string {
400 return cd->columnName +
":" +
std::to_string(cd->columnId) +
":" +
410 file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
413 file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
417 run(
"tar " + compression +
" --transform=s|" + uuid +
418 std::filesystem::path::preferred_separator +
"|| -cvf " +
426 const std::string& archive_path,
427 const std::string& compression) {
431 throw std::runtime_error(
"DUMP/RESTORE is not supported yet on distributed setup.");
433 if (!boost::filesystem::exists(archive_path)) {
434 throw std::runtime_error(
"Archive " + archive_path +
" does not exist.");
437 throw std::runtime_error(
"Restoring view or temporary table is not supported.");
441 const auto table_read_lock =
444 const auto insert_data_lock =
453 const auto uuid_dir = std::filesystem::path(
abs_path(global_file_mgr)) / uuid;
455 if (!std::filesystem::create_directory(uuid_dir)) {
456 throw std::runtime_error(
"Failed to create work directory '" + uuid_dir.string() +
457 "' while restoring table.");
461 if (std::filesystem::exists(uuid_dir)) {
462 std::filesystem::remove_all(uuid_dir);
467 constexpr
static const auto temp_data_basename =
"_data";
468 constexpr
static const auto temp_back_basename =
"_back";
469 const auto temp_data_dir = uuid_dir / temp_data_basename;
470 const auto temp_back_dir = uuid_dir / temp_back_basename;
476 CHECK(create_table_stmt);
480 std::list<ColumnDescriptor> src_columns;
481 std::vector<Parser::SharedDictionaryDef> shared_dict_defs;
482 create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
487 throw std::runtime_error(
"Incompatible table VACCUM option");
492 throw std::runtime_error(
"Unmatched number of table shards");
495 const auto dst_columns =
497 if (dst_columns.size() != src_columns.size()) {
498 throw std::runtime_error(
"Unmatched number of table columns");
500 for (
const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
501 if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
502 src_cd.columnType.get_compression_name() !=
503 dst_cd->columnType.get_compression_name()) {
504 throw std::runtime_error(
"Incompatible types on column " + src_cd.columnName);
508 const auto all_src_oldinfo_str =
510 std::vector<std::string> src_oldinfo_strs;
513 boost::is_any_of(
" "),
514 boost::token_compress_on);
515 auto all_dst_columns =
517 if (src_oldinfo_strs.size() != all_dst_columns.size()) {
518 throw std::runtime_error(
"Source table has a unmatched number of columns: " +
528 std::unordered_map<int, int> column_ids_map;
529 std::unordered_map<std::string, std::string> dict_paths_map;
531 std::list<std::vector<std::string>> src_oldinfo_tokens;
533 src_oldinfo_strs.begin(),
534 src_oldinfo_strs.end(),
535 std::back_inserter(src_oldinfo_tokens),
536 [](
const auto& src_oldinfo_str) ->
auto {
537 std::vector<std::string> tokens;
539 tokens, src_oldinfo_str, boost::is_any_of(
":"), boost::token_compress_on);
542 src_oldinfo_tokens.sort(
543 [](
const auto& lhs,
const auto& rhs) {
return lhs[0].compare(rhs[0]) < 0; });
544 all_dst_columns.sort(
545 [](
auto a,
auto b) {
return a->columnName.compare(b->columnName) < 0; });
548 src_oldinfo_tokens.end(),
549 all_dst_columns.begin(),
550 std::inserter(column_ids_map, column_ids_map.end()),
551 [&](
const auto& tokens,
const auto& cd) -> std::pair<int, int> {
553 << cd->columnName <<
":" << cd->columnId;
555 return {boost::lexical_cast<
int>(tokens[1]), cd->columnId};
557 bool was_table_altered =
false;
558 std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](
auto& it) {
559 was_table_altered = was_table_altered || it.first != it.second;
561 VLOG(3) <<
"was_table_altered = " << was_table_altered;
564 run(
"rm -rf " + temp_data_dir.string());
565 run(
"mkdir -p " + temp_data_dir.string());
569 if (was_table_altered) {
570 const auto epoch = boost::lexical_cast<int32_t>(
574 VLOG(3) <<
"adjust_altered_table_files: " << time_ms <<
" ms";
580 std::vector<std::string> both_file_dirs;
581 std::merge(data_file_dirs.begin(),
582 data_file_dirs.end(),
583 dict_file_dirs.begin(),
584 dict_file_dirs.end(),
585 std::back_inserter(both_file_dirs));
586 bool backup_completed =
false;
588 run(
"rm -rf " + temp_back_dir.string());
589 run(
"mkdir -p " + temp_back_dir.string());
590 for (
const auto& dir : both_file_dirs) {
591 const auto dir_full_path =
abs_path(global_file_mgr) +
"/" + dir;
592 if (boost::filesystem::is_directory(dir_full_path)) {
593 run(
"mv " + dir_full_path +
" " + temp_back_dir.string());
596 backup_completed =
true;
600 for (
const auto& dit : dict_paths_map) {
601 if (!dit.first.empty() && !dit.second.empty()) {
602 const auto src_dict_path = temp_data_dir.string() +
"/" + dit.first;
603 const auto dst_dict_path =
abs_path(global_file_mgr) +
"/" + dit.second;
604 run(
"mv " + src_dict_path +
" " + dst_dict_path);
609 throw std::runtime_error(
"lol!");
614 if (backup_completed) {
619 boost::filesystem::path base_path(temp_back_dir);
620 boost::filesystem::directory_iterator end_it;
621 for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
622 run(
"mv " + fit->path().string() +
" .",
abs_path(global_file_mgr));
634 std::string get_restore_dir_path() {
636 auto restore_dir_path = std::filesystem::canonical(
g_base_path) /
638 return restore_dir_path;
641 std::string download_s3_file(
const std::string& s3_archive_path,
643 const std::string& restore_dir_path) {
651 std::optional<std::string>{},
652 std::optional<std::string>{},
653 std::optional<std::string>{},
655 s3_archive.init_for_read();
656 const auto& object_key = s3_archive.get_objkeys();
657 if (object_key.size() > 1) {
658 throw std::runtime_error(
659 "S3 URI references multiple files. Only one file can be restored at a time.");
661 CHECK_EQ(object_key.size(), size_t(1));
662 std::exception_ptr eptr;
663 return s3_archive.land(object_key[0], eptr,
false,
false,
false);
671 const std::string& table_name,
672 const std::string& archive_path,
673 const std::string& compression,
675 auto local_archive_path = archive_path;
677 const auto restore_dir_path = get_restore_dir_path();
678 ScopeGuard archive_cleanup_guard = [&archive_path, &restore_dir_path] {
679 if (
shared::is_s3_uri(archive_path) && std::filesystem::exists(restore_dir_path)) {
680 std::filesystem::remove_all(restore_dir_path);
684 local_archive_path = download_s3_file(archive_path, s3_options, restore_dir_path);
689 const auto schema_str =
get_table_schema(local_archive_path, table_name, compression);
692 CHECK(create_table_stmt);
693 create_table_stmt->execute(session,
false );
699 const auto schema_str =
"DROP TABLE IF EXISTS " + table_name +
";";
700 std::unique_ptr<Parser::Stmt> stmt =
703 CHECK(drop_table_stmt);
704 drop_table_stmt->execute(session,
false );
std::string s3_secret_key
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
void delete_old_symlinks(const std::string &table_data_dir)
std::string getBasePath() const
std::string getColumnDictDirectory(const ColumnDescriptor *cd, bool file_name_only=true) const
static constexpr char const * table_schema_filename
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
static TimeT::rep execution(F func, Args &&...args)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void rewrite_column_ids_in_page_headers(const boost::filesystem::path &path, const std::unordered_map< int, int > &column_ids_map, const int32_t table_epoch)
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
bool is_s3_uri(const std::string &file_path)
Data_Namespace::DataMgr & getDataMgr() const
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
void startThread(FuncType &&func, Args &&...args)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
virtual void checkThreadsStatus()
static constexpr char const * table_oldinfo_filename
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
const std::string kDefaultImportDirName
Classes representing a parse tree.
const DBMetadata & getCurrentDB() const
void rename_table_directories(const File_Namespace::GlobalFileMgr *global_file_mgr, const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix)
::FILE * fopen(const char *filename, const char *mode)
OUTPUT transform(INPUT const &input, FUNC const &func)
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Data_Namespace::MemoryLevel persistenceLevel
void adjust_altered_table_files(const int32_t table_epoch, const std::string &temp_data_dir, const std::unordered_map< int, int > &column_ids_map)
std::string s3_session_token
static constexpr char const * table_epoch_filename
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
std::string filename(char const *path)
void add_data_file_symlinks(const std::string &table_data_dir)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::string s3_access_key
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression)
std::string dumpSchema(const TableDescriptor *td) const
void restoreTable(const Catalog_Namespace::SessionInfo &session, const std::string &table_name, const std::string &archive_path, const std::string &compression, const TableArchiverS3Options &s3_options)
size_t file_size(const int fd)
void dumpTable(const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr auto kLegacyDataFileExtension
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
A selection of helper methods for File I/O.
bool g_test_rollback_dump_restore
Catalog_Namespace::Catalog * cat_