20 #include <boost/filesystem.hpp>
21 #include <boost/process.hpp>
22 #include <boost/range/combine.hpp>
23 #include <boost/version.hpp>
33 #include <system_error>
54 #if BOOST_VERSION < 107300
57 template <
typename T,
typename U>
58 struct tuple_size<boost::tuples::cons<T, U>>
59 : boost::tuples::length<boost::tuples::cons<T, U>> {};
60 template <
size_t I,
typename T,
typename U>
61 struct tuple_element<I, boost::tuples::cons<T, U>>
62 : boost::tuples::element<I, boost::tuples::cons<T, U>> {};
72 return boost::filesystem::canonical(global_file_mgr->
getBasePath()).
string();
75 inline std::string
run(
const std::string& cmd,
const std::string& chdir =
"") {
76 VLOG(3) <<
"running cmd: " << cmd;
79 std::string output, errors;
81 using namespace boost::process;
82 ipstream stdout, stderr;
84 rcode = system(cmd, std_out > stdout, std_err > stderr, ec, start_dir = chdir);
86 rcode = system(cmd, std_out > stdout, std_err > stderr, ec);
88 std::ostringstream ss_output, ss_errors;
89 stdout >> ss_output.rdbuf();
90 stderr >> ss_errors.rdbuf();
91 output = ss_output.str();
92 errors = ss_errors.str();
96 LOG(
ERROR) <<
"exit code: " << rcode;
97 LOG(
ERROR) <<
"error code: " << ec.value() <<
" - " << ec.message();
100 #if defined(__APPLE__)
105 if (1 == rcode && cmd.find(
"--fast-read") &&
106 (errors.find(
"cannot write decoded block") != std::string::npos ||
107 errors.find(
"Broken pipe") != std::string::npos)) {
109 LOG(
ERROR) <<
"tar error ignored on osx for --fast-read";
114 if (1 == rcode && errors.find(
"changed as we read") != std::string::npos) {
115 LOG(
ERROR) <<
"tar error ignored under concurrent inserts";
118 std::string error_message;
120 error_code = ec.value();
121 error_message = ec.message();
126 if (
to_lower(errors).find(
"permission denied") != std::string::npos) {
127 error_message =
"Insufficient file read/write permission.";
129 error_message = errors;
132 throw std::runtime_error(
133 "An error occurred while executing an internal command. Error code: " +
137 VLOG(3) <<
"finished cmd: " << cmd;
138 VLOG(3) <<
"time: " << time_ms <<
" ms";
139 VLOG(3) <<
"stdout: " << output;
145 const std::string& file_name,
146 const std::string& compression) {
149 #if defined(__APPLE__)
150 constexpr
static auto opt_occurrence =
"--fast-read";
152 constexpr
static auto opt_occurrence =
"--occurrence=1";
154 boost::filesystem::path temp_dir =
155 boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
156 boost::filesystem::create_directories(temp_dir);
158 opt_occurrence +
" " + file_name,
160 const auto output =
run(
"cat " + (temp_dir / file_name).
string());
161 boost::filesystem::remove_all(temp_dir);
166 const std::string& table,
167 const std::string& compression) {
168 const auto schema_str =
170 std::regex regex(
"@T");
171 return std::regex_replace(schema_str, regex, table);
177 const boost::filesystem::path& path,
178 const std::unordered_map<int, int>& column_ids_map,
179 const int32_t table_epoch) {
180 const std::string file_path = path.string();
181 const std::string file_name = path.filename().string();
182 std::vector<std::string> tokens;
186 if (tokens.size() <= 2 || !(
DATA_FILE_EXT ==
"." + tokens[2] || tokens[2] ==
"mapd")) {
191 const auto page_size = boost::lexical_cast<int64_t>(tokens[1]);
193 std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
196 throw std::runtime_error(
"Failed to open " + file_path +
197 " for update: " + std::strerror(errno));
203 for (
size_t page = 0; page < file_size / page_size; ++page) {
204 int32_t header_info[8];
205 if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
206 throw std::runtime_error(
"Failed to seek to page# " +
std::to_string(page) +
207 file_path +
" for read: " + std::strerror(errno));
209 if (1 != fread(header_info,
sizeof header_info, 1, fp.get())) {
210 throw std::runtime_error(
"Failed to read " + file_path +
": " +
211 std::strerror(errno));
213 if (
const auto header_size = header_info[0]; header_size > 0) {
216 auto& contingent = header_info[1];
219 auto& epoch = header_info[2];
220 auto& col_id = header_info[3];
222 table_epoch, epoch, contingent)) {
225 auto column_map_it = column_ids_map.find(col_id);
226 CHECK(column_map_it != column_ids_map.end()) <<
"could not find " << col_id;
229 if (
const auto dest_col_id = column_map_it->second; col_id != dest_col_id) {
230 col_id = dest_col_id;
231 if (0 != std::fseek(fp.get(), page * page_size, SEEK_SET)) {
232 throw std::runtime_error(
"Failed to seek to page# " +
std::to_string(page) +
233 file_path +
" for write: " + std::strerror(errno));
235 if (1 != fwrite(header_info,
sizeof header_info, 1, fp.get())) {
236 throw std::runtime_error(
"Failed to write " + file_path +
": " +
237 std::strerror(errno));
248 const std::string& temp_data_dir,
249 const std::unordered_map<int, int>& column_ids_map) {
250 boost::filesystem::path base_path(temp_data_dir);
251 boost::filesystem::recursive_directory_iterator end_it;
253 for (boost::filesystem::recursive_directory_iterator fit(base_path); fit != end_it;
255 if (!boost::filesystem::is_symlink(fit->path()) &&
256 boost::filesystem::is_regular_file(fit->status())) {
262 thread_controller.
finish();
266 std::vector<boost::filesystem::path> symlinks;
267 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
269 if (boost::filesystem::is_symlink(it->path())) {
270 symlinks.emplace_back(it->path());
273 for (
const auto& symlink : symlinks) {
274 boost::filesystem::remove_all(symlink);
279 std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
280 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
282 const auto path = boost::filesystem::canonical(it->path());
284 auto old_path = path;
287 if (!boost::filesystem::exists(old_path)) {
288 old_to_new_paths[old_path] = path;
292 for (
const auto& [old_path, new_path] : old_to_new_paths) {
293 boost::filesystem::create_symlink(new_path.filename(), old_path);
298 const std::string& temp_data_dir,
299 const std::vector<std::string>& target_paths,
300 const std::string& name_prefix) {
301 boost::filesystem::path base_path(temp_data_dir);
302 boost::filesystem::directory_iterator end_it;
303 int target_path_index = 0;
304 for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
305 if (!boost::filesystem::is_regular_file(fit->status())) {
306 const std::string file_path = fit->path().string();
307 const std::string file_name = fit->path().filename().string();
308 if (boost::istarts_with(file_name, name_prefix)) {
309 const std::string target_path =
310 abs_path(global_file_mgr) +
"/" + target_paths[target_path_index++];
311 if (std::rename(file_path.c_str(), target_path.c_str())) {
312 throw std::runtime_error(
"Failed to rename file " + file_path +
" to " +
313 target_path +
": " + std::strerror(errno));
329 const std::string& archive_path,
330 const std::string& compression) {
332 throw std::runtime_error(
"Dumping a system table is not supported.");
337 throw std::runtime_error(
"DUMP/RESTORE is not supported yet on distributed setup.");
339 if (boost::filesystem::exists(archive_path)) {
340 throw std::runtime_error(
"Archive " + archive_path +
" already exists.");
343 throw std::runtime_error(
"Dumping view or temporary table is not supported.");
347 std::vector<std::string> file_paths;
348 auto file_writer = [&file_paths, global_file_mgr](
const std::string& file_name,
349 const std::string& file_type,
350 const std::string& file_data) {
351 const auto file_path =
abs_path(global_file_mgr) +
"/" + file_name;
352 std::unique_ptr<FILE, decltype(simple_file_closer)> fp(
355 throw std::runtime_error(
"Failed to create " + file_type +
" file '" + file_path +
356 "': " + std::strerror(errno));
358 if (std::fwrite(file_data.data(), 1, file_data.size(), fp.get()) < file_data.size()) {
359 throw std::runtime_error(
"Failed to write " + file_type +
" file '" + file_path +
360 "': " + std::strerror(errno));
362 file_paths.push_back(file_name);
372 std::vector<std::string> column_oldinfo;
375 std::back_inserter(column_oldinfo),
376 [&](
const auto cd) -> std::string {
377 return cd->columnName +
":" +
std::to_string(cd->columnId) +
":" +
387 file_paths.insert(file_paths.end(), data_file_dirs.begin(), data_file_dirs.end());
390 file_paths.insert(file_paths.end(), dict_file_dirs.begin(), dict_file_dirs.end());
402 const std::string& archive_path,
403 const std::string& compression) {
407 throw std::runtime_error(
"DUMP/RESTORE is not supported yet on distributed setup.");
409 if (!boost::filesystem::exists(archive_path)) {
410 throw std::runtime_error(
"Archive " + archive_path +
" does not exist.");
413 throw std::runtime_error(
"Restoring view or temporary table is not supported.");
417 const auto table_read_lock =
420 const auto insert_data_lock =
426 constexpr
static const auto temp_data_basename =
"_data";
427 constexpr
static const auto temp_back_basename =
"_back";
428 const auto temp_data_dir =
abs_path(global_file_mgr) +
"/" + temp_data_basename;
429 const auto temp_back_dir =
abs_path(global_file_mgr) +
"/" + temp_back_basename;
431 auto tmp_files_cleaner = [&](
void*) {
432 run(
"rm -rf " + temp_data_dir +
" " + temp_back_dir);
437 std::unique_ptr<decltype(tmp_files_cleaner), decltype(tmp_files_cleaner)> tfc(
438 &tmp_files_cleaner, tmp_files_cleaner);
444 CHECK(create_table_stmt);
448 std::list<ColumnDescriptor> src_columns;
449 std::vector<Parser::SharedDictionaryDef> shared_dict_defs;
450 create_table_stmt->executeDryRun(session, src_td, src_columns, shared_dict_defs);
455 throw std::runtime_error(
"Incompatible table VACCUM option");
460 throw std::runtime_error(
"Unmatched number of table shards");
463 const auto dst_columns =
465 if (dst_columns.size() != src_columns.size()) {
466 throw std::runtime_error(
"Unmatched number of table columns");
468 for (
const auto& [src_cd, dst_cd] : boost::combine(src_columns, dst_columns)) {
469 if (src_cd.columnType.get_type_name() != dst_cd->columnType.get_type_name() ||
470 src_cd.columnType.get_compression_name() !=
471 dst_cd->columnType.get_compression_name()) {
472 throw std::runtime_error(
"Incompatible types on column " + src_cd.columnName);
476 const auto all_src_oldinfo_str =
478 std::vector<std::string> src_oldinfo_strs;
481 boost::is_any_of(
" "),
482 boost::token_compress_on);
483 auto all_dst_columns =
485 if (src_oldinfo_strs.size() != all_dst_columns.size()) {
486 throw std::runtime_error(
"Source table has a unmatched number of columns: " +
496 std::unordered_map<int, int> column_ids_map;
497 std::unordered_map<std::string, std::string> dict_paths_map;
499 std::list<std::vector<std::string>> src_oldinfo_tokens;
501 src_oldinfo_strs.begin(),
502 src_oldinfo_strs.end(),
503 std::back_inserter(src_oldinfo_tokens),
504 [](
const auto& src_oldinfo_str) ->
auto {
505 std::vector<std::string> tokens;
507 tokens, src_oldinfo_str, boost::is_any_of(
":"), boost::token_compress_on);
510 src_oldinfo_tokens.sort(
511 [](
const auto& lhs,
const auto& rhs) {
return lhs[0].compare(rhs[0]) < 0; });
512 all_dst_columns.sort(
513 [](
auto a,
auto b) {
return a->columnName.compare(b->columnName) < 0; });
516 src_oldinfo_tokens.end(),
517 all_dst_columns.begin(),
518 std::inserter(column_ids_map, column_ids_map.end()),
519 [&](
const auto& tokens,
const auto& cd) -> std::pair<int, int> {
521 << cd->columnName <<
":" << cd->columnId;
523 return {boost::lexical_cast<
int>(tokens[1]), cd->columnId};
525 bool was_table_altered =
false;
526 std::for_each(column_ids_map.begin(), column_ids_map.end(), [&](
auto& it) {
527 was_table_altered = was_table_altered || it.first != it.second;
529 VLOG(3) <<
"was_table_altered = " << was_table_altered;
532 run(
"rm -rf " + temp_data_dir);
533 run(
"mkdir -p " + temp_data_dir);
537 if (was_table_altered) {
538 const auto epoch = boost::lexical_cast<int32_t>(
542 VLOG(3) <<
"adjust_altered_table_files: " << time_ms <<
" ms";
548 std::vector<std::string> both_file_dirs;
549 std::merge(data_file_dirs.begin(),
550 data_file_dirs.end(),
551 dict_file_dirs.begin(),
552 dict_file_dirs.end(),
553 std::back_inserter(both_file_dirs));
554 bool backup_completed =
false;
556 run(
"rm -rf " + temp_back_dir);
557 run(
"mkdir -p " + temp_back_dir);
558 for (
const auto& dir : both_file_dirs) {
559 const auto dir_full_path =
abs_path(global_file_mgr) +
"/" + dir;
560 if (boost::filesystem::is_directory(dir_full_path)) {
561 run(
"mv " + dir_full_path +
" " + temp_back_dir);
564 backup_completed =
true;
568 for (
const auto& dit : dict_paths_map) {
569 if (!dit.first.empty() && !dit.second.empty()) {
570 const auto src_dict_path = temp_data_dir +
"/" + dit.first;
571 const auto dst_dict_path =
abs_path(global_file_mgr) +
"/" + dit.second;
572 run(
"mv " + src_dict_path +
" " + dst_dict_path);
577 throw std::runtime_error(
"lol!");
582 if (backup_completed) {
587 boost::filesystem::path base_path(temp_back_dir);
588 boost::filesystem::directory_iterator end_it;
589 for (boost::filesystem::directory_iterator fit(base_path); fit != end_it; ++fit) {
590 run(
"mv " + fit->path().string() +
" .",
abs_path(global_file_mgr));
603 const std::string& table_name,
604 const std::string& archive_path,
605 const std::string& compression) {
607 const auto schema_str =
get_table_schema(archive_path, table_name, compression);
610 CHECK(create_table_stmt);
611 create_table_stmt->execute(session,
false );
617 const auto schema_str =
"DROP TABLE IF EXISTS " + table_name +
";";
618 std::unique_ptr<Parser::Stmt> stmt =
621 CHECK(drop_table_stmt);
622 drop_table_stmt->execute(session,
false );
std::string get_table_schema(const std::string &archive_path, const std::string &table, const std::string &compression)
void delete_old_symlinks(const std::string &table_data_dir)
std::string getBasePath() const
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
std::string getColumnDictDirectory(const ColumnDescriptor *cd, bool file_name_only=true) const
static constexpr char const * table_schema_filename
static TimeT::rep execution(F func, Args &&...args)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void rewrite_column_ids_in_page_headers(const boost::filesystem::path &path, const std::unordered_map< int, int > &column_ids_map, const int32_t table_epoch)
std::string abs_path(const File_Namespace::GlobalFileMgr *global_file_mgr)
Data_Namespace::DataMgr & getDataMgr() const
void startThread(FuncType &&func, Args &&...args)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
virtual void checkThreadsStatus()
static constexpr char const * table_oldinfo_filename
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
std::vector< std::string > getTableDataDirectories(const TableDescriptor *td) const
void restoreTable(const Catalog_Namespace::SessionInfo &session, const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
Classes representing a parse tree.
const DBMetadata & getCurrentDB() const
void rename_table_directories(const File_Namespace::GlobalFileMgr *global_file_mgr, const std::string &temp_data_dir, const std::vector< std::string > &target_paths, const std::string &name_prefix)
::FILE * fopen(const char *filename, const char *mode)
OUTPUT transform(INPUT const &input, FUNC const &func)
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
std::vector< std::string > getTableDictDirectories(const TableDescriptor *td) const
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Data_Namespace::MemoryLevel persistenceLevel
void adjust_altered_table_files(const int32_t table_epoch, const std::string &temp_data_dir, const std::unordered_map< int, int > &column_ids_map)
static constexpr char const * table_epoch_filename
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
void add_data_file_symlinks(const std::string &table_data_dir)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::string simple_file_cat(const std::string &archive_path, const std::string &file_name, const std::string &compression)
std::string dumpSchema(const TableDescriptor *td) const
size_t file_size(const int fd)
void dumpTable(const TableDescriptor *td, const std::string &archive_path, const std::string &compression)
constexpr auto kLegacyDataFileExtension
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
A selection of helper methods for File I/O.
bool g_test_rollback_dump_restore
Catalog_Namespace::Catalog * cat_