25 namespace foreign_storage {
38 const size_t buffer_size,
40 const char line_delim) {
41 if (read_size == 0 || buffer[read_size - 1] != line_delim) {
42 CHECK(buffer_size > read_size);
43 static_cast<char*
>(buffer)[read_size] = line_delim;
45 }
else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
57 size_t offset_to_index(
const std::vector<size_t>& cumulative_sizes,
size_t byte_offset) {
59 std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
60 if (iterator == cumulative_sizes.end()) {
61 throw std::runtime_error{
"Invalid offset into cumulative_sizes"};
63 return iterator - cumulative_sizes.begin();
68 return file_size - header_size + 1;
88 , scan_finished_(
false)
90 , total_bytes_read_(0) {
93 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
94 file_path +
"\". " + strerror(errno)};
99 fseek(
file_, 0, SEEK_END);
108 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
109 file_path +
"\". " + strerror(errno)};
115 const rapidjson::Value& value)
117 , scan_finished_(
true)
119 , total_bytes_read_(0) {
122 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
123 file_path +
"\". " + strerror(errno)};
131 rapidjson::Value& value,
132 rapidjson::Document::AllocatorType& allocator)
const {
148 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
151 fseek(
file_, 0, SEEK_END);
152 size_t new_file_size = ftell(
file_);
158 throw std::runtime_error{
"An error occurred when attempting to read offset " +
160 " in file: \"" +
file_path_ +
"\". " + strerror(errno)};
192 if (
arch_.get()->read_next_header()) {
195 throw std::runtime_error{
"Invalid archive entry"};
203 bool success =
arch_.get()->read_next_header();
214 if (dest_buffer !=
nullptr) {
223 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
224 CHECK(block_chars_remaining_ > 0);
225 return static_cast<const char*
>(current_block_)[0];
247 , archive_(file_path)
248 , initial_scan_(
true)
249 , scan_finished_(
false)
251 , current_index_(-1) {
258 const rapidjson::Value& value)
272 size_t buffer_size) {
273 size_t remaining_size = read_size;
274 char*
dest =
static_cast<char*
>(buffer);
281 remaining_size -= copy_size;
284 size_t bytes_read = read_size - remaining_size;
298 size_t bytes_read =
readInternal(buffer, max_size - 1, max_size);
366 std::optional<std::string> str = std::nullopt;
373 auto first_line = std::make_optional<std::string>();
375 reader.consumeFirstLine(first_line);
376 return first_line.value();
380 char* dest_buffer =
nullptr;
382 if (dest_str.has_value()) {
383 auto& str = dest_str.value();
384 str.resize(str.length() + 1);
385 dest_buffer = str.data() + str.length() - 1;
401 while (n_bytes > 0) {
429 int entry_number = 0;
448 throw std::runtime_error{
449 "Foreign table refreshed with APPEND mode missing archive entry \"" +
451 boost::filesystem::path(
file_path_).filename().string() +
"\"."};
474 size_t last_size = 0;
475 size_t file_index = -1;
476 size_t num_file_entries = 0;
484 if (num_file_entries == 1) {
492 for (
size_t zero_index = 0; zero_index < file_index; zero_index++) {
511 rapidjson::Value& value,
512 rapidjson::Document::AllocatorType& allocator)
const {
529 , is_end_of_last_file_(
false) {}
533 const rapidjson::Value& value)
537 , is_end_of_last_file_(
false) {
544 CHECK(value.HasMember(
"files_metadata"));
545 CHECK(value[
"files_metadata"].IsArray());
550 rapidjson::Document::AllocatorType& allocator)
const {
558 rapidjson::Value files_metadata(rapidjson::kArrayType);
559 for (
size_t index = 0; index <
files_.size(); index++) {
560 rapidjson::Value file_metadata(rapidjson::kObjectType);
561 files_[index]->serialize(file_metadata, allocator);
562 files_metadata.PushBack(file_metadata, allocator);
564 value.AddMember(
"files_metadata", files_metadata, allocator);
568 size_t total_size = 0;
570 total_size +=
files_[index]->getRemainingSize();
576 bool size_known =
true;
578 size_known = size_known &&
files_[index]->isRemainingSizeKnown();
585 for (
const auto& file :
files_) {
586 first_line_by_file_path.merge(file->getFirstLineForEachFile());
588 return first_line_by_file_path;
596 const std::string& file_path,
598 const std::optional<std::string>& regex_path_filter,
599 const std::optional<std::string>& file_sort_order_by,
600 const std::optional<std::string>& file_sort_regex)
603 file_path, regex_path_filter, file_sort_order_by, file_sort_regex);
604 for (
const auto& location : found_file_locations) {
611 const rapidjson::Value& value)
616 files_.emplace_back(std::make_unique<CompressedFileReader>(
619 value[
"files_metadata"].GetArray()[index]));
621 files_.emplace_back(std::make_unique<SingleTextFileReader>(
624 value[
"files_metadata"].GetArray()[index]));
635 if (
files_.back()->isScanFinished()) {
647 std::set<std::string> new_locations;
650 if (boost::filesystem::is_directory(
file_path_)) {
652 std::set<std::string> all_file_paths;
653 for (boost::filesystem::recursive_directory_iterator
654 it(
file_path_, boost::filesystem::symlink_option::recurse),
661 if (!boost::filesystem::is_directory(it->path()) && new_file) {
662 new_locations.insert(it->path().string());
664 all_file_paths.emplace(it->path().string());
668 if (all_file_paths.find(file_path) == all_file_paths.end()) {
673 if (new_locations.size() > 0) {
674 for (
const auto& location : new_locations) {
677 }
else if (
files_.size() == 1) {
679 files_[0].get()->checkForMoreRows(file_offset);
717 size_t read_size = size;
722 size_t bytes_read =
files_[index].get()->readRegion(buffer, offset - base, read_size);
DEVICE auto upper_bound(ARGS &&...args)
std::vector< std::string > sourcenames_
void skipHeader() override
virtual std::string getFirstLine() const =0
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
std::vector< int > archive_entry_index_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
int getCurrentEntryIndex() const
std::string getFirstLine() const override
bool isEndOfLastFile() override
std::vector< size_t > cumulative_sizes_
void throw_removed_row_in_file_error(const std::string &file_path)
CompressedFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
FirstLineByFilePath getFirstLineForEachFile() const override
size_t readRegion(void *buffer, size_t offset, size_t size) override
bool is_end_of_last_file_
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
std::map< std::string, std::string > FirstLineByFilePath
MultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
void insertFile(std::string location)
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params, const std::optional< std::string > ®ex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
import_export::CopyParams copy_params_
void throw_removed_file_error(const std::string &file_path)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
ImportHeaderRow has_header
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
bool isRemainingSizeKnown() override
FirstLineByFilePath getFirstLineForEachFile() const override
::FILE * fopen(const char *filename, const char *mode)
bool currentEntryFinished() const
size_t readRegion(void *buffer, size_t offset, size_t size) override
void skipBytes(size_t n_bytes)
void consumeFirstLine(std::optional< std::string > &dest_str)
virtual bool isScanFinished()=0
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
size_t get_data_size(size_t file_size, size_t header_size)
bool isScanFinished() override
std::unique_ptr< Archive > arch_
const void * current_block_
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex, const bool recurse)
static constexpr size_t DEFAULT_HEADER_READ_SIZE
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
SingleFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t read(void *buffer, size_t max_size) override
void skipToEntry(int entry_number)
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
shared utility for mime-types
bool g_enable_watchdog false
bool isScanFinished() override
size_t currentEntryDataAvailable() const
bool isScanFinished() override
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
std::string getFirstLine() const override
size_t getRemainingSize() override
void skipHeader() override
std::vector< std::string > file_locations_
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
size_t file_size(const int fd)
bool isEndOfLastFile() override
bool is_compressed_file_extension(const std::string &location)
std::vector< std::unique_ptr< FileReader > > files_
std::vector< size_t > cumulative_sizes_
size_t read(void *buffer, size_t max_size) override
size_t block_chars_remaining_
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override