21 namespace foreign_storage {
34 const size_t buffer_size,
36 const char line_delim) {
37 if (read_size == 0 || buffer[read_size - 1] != line_delim) {
38 CHECK(buffer_size > read_size);
39 static_cast<char*
>(buffer)[read_size] = line_delim;
41 }
else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
53 size_t offset_to_index(
const std::vector<size_t>& cumulative_sizes,
size_t byte_offset) {
55 std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
56 if (iterator == cumulative_sizes.end()) {
57 throw std::runtime_error{
"Invalid offset into cumulative_sizes"};
59 return iterator - cumulative_sizes.begin();
64 return file_size - header_size + 1;
72 , scan_finished_(
false)
74 , total_bytes_read_(0) {
77 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
78 file_path +
"\". " + strerror(errno)};
83 std::ifstream file{file_path};
86 std::getline(file, line, copy_params.
line_delim);
90 fseek(
file_, 0, SEEK_END);
95 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
96 file_path +
"\". " + strerror(errno)};
102 const rapidjson::Value& value)
104 , scan_finished_(
true)
106 , total_bytes_read_(0) {
109 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
110 file_path +
"\". " + strerror(errno)};
118 rapidjson::Document::AllocatorType& allocator)
const {
128 const UserMapping* user_mapping) {
134 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
137 fseek(
file_, 0, SEEK_END);
138 size_t new_file_size = ftell(
file_);
144 throw std::runtime_error{
"An error occurred when attempting to read offset " +
146 " in file: \"" +
file_path_ +
"\". " + strerror(errno)};
163 if (
arch_.get()->read_next_header()) {
166 throw std::runtime_error{
"Invalid archive entry"};
174 bool success =
arch_.get()->read_next_header();
185 if (dest_buffer !=
nullptr) {
194 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
195 CHECK(block_chars_remaining_ > 0);
196 return static_cast<const char*
>(current_block_)[0];
218 , archive_(file_path)
219 , initial_scan_(
true)
220 , scan_finished_(
false)
222 , current_index_(-1) {
229 const rapidjson::Value& value)
243 size_t buffer_size) {
244 size_t remaining_size = read_size;
245 char*
dest =
static_cast<char*
>(buffer);
252 remaining_size -= copy_size;
255 size_t bytes_read = read_size - remaining_size;
269 size_t bytes_read =
readInternal(buffer, max_size - 1, max_size);
353 while (n_bytes > 0) {
371 const UserMapping* user_mapping) {
381 int entry_number = 0;
400 throw std::runtime_error{
401 "Foreign table refreshed with APPEND mode missing archive entry \"" +
403 boost::filesystem::path(
file_path_).filename().string() +
"\"."};
426 size_t last_size = 0;
427 size_t csv_index = -1;
428 size_t num_csv_entries = 0;
436 if (num_csv_entries == 1) {
444 for (
size_t zero_index = 0; zero_index < csv_index; zero_index++) {
463 rapidjson::Value& value,
464 rapidjson::Document::AllocatorType& allocator)
const {
478 :
CsvReader(file_path, copy_params), current_index_(0), current_offset_(0) {}
482 const rapidjson::Value& value)
483 :
CsvReader(file_path, copy_params), current_index_(0), current_offset_(0) {
490 CHECK(value.HasMember(
"files_metadata"));
491 CHECK(value[
"files_metadata"].IsArray());
496 rapidjson::Document::AllocatorType& allocator)
const {
504 rapidjson::Value files_metadata(rapidjson::kArrayType);
505 for (
size_t index = 0; index <
files_.size(); index++) {
506 rapidjson::Value file_metadata(rapidjson::kObjectType);
507 files_[index]->serialize(file_metadata, allocator);
508 files_metadata.PushBack(file_metadata, allocator);
510 value.AddMember(
"files_metadata", files_metadata, allocator);
514 size_t total_size = 0;
516 total_size +=
files_[index]->getRemainingSize();
522 bool size_known =
true;
524 size_known = size_known &&
files_[index]->isRemainingSizeKnown();
532 std::set<std::string> file_locations;
533 if (boost::filesystem::is_directory(file_path)) {
535 for (boost::filesystem::recursive_directory_iterator
536 it(file_path, boost::filesystem::symlink_option::recurse),
540 if (!boost::filesystem::is_directory(it->path())) {
541 file_locations.insert(it->path().string());
545 file_locations.insert(file_path);
547 for (
const auto& location : file_locations) {
554 const std::vector<std::string> compressed_exts = {
555 ".zip",
".gz",
".tar",
".rar",
".bz2",
".7z",
".tgz"};
556 const std::vector<std::string> uncompressed_exts = {
"",
".csv",
".tsv",
".txt"};
557 if (std::find(compressed_exts.begin(),
558 compressed_exts.end(),
559 boost::filesystem::extension(location)) != compressed_exts.end()) {
561 }
else if (std::find(uncompressed_exts.begin(),
562 uncompressed_exts.end(),
563 boost::filesystem::extension(location)) !=
564 uncompressed_exts.end()) {
567 throw std::runtime_error{
"Invalid extention for file \"" + location +
"\"."};
574 const rapidjson::Value& value)
579 files_.emplace_back(std::make_unique<CompressedFileReader>(
582 value[
"files_metadata"].GetArray()[index]));
587 value[
"files_metadata"].GetArray()[index]));
598 if (
files_.back()->isScanFinished()) {
608 const UserMapping* user_mapping) {
610 std::set<std::string> new_locations;
613 if (boost::filesystem::is_directory(
file_path_)) {
615 std::set<std::string> all_file_paths;
616 for (boost::filesystem::recursive_directory_iterator
617 it(
file_path_, boost::filesystem::symlink_option::recurse),
624 if (!boost::filesystem::is_directory(it->path()) && new_file) {
625 new_locations.insert(it->path().string());
627 all_file_paths.emplace(it->path().string());
631 if (all_file_paths.find(file_path) == all_file_paths.end()) {
636 if (new_locations.size() > 0) {
637 for (
const auto& location : new_locations) {
640 }
else if (
files_.size() == 1) {
642 files_[0].get()->checkForMoreRows(file_offset);
677 size_t read_size = size;
682 size_t bytes_read =
files_[index].get()->readRegion(buffer, offset - base, read_size);
DEVICE auto upper_bound(ARGS &&...args)
std::vector< std::string > sourcenames_
std::vector< int > archive_entry_index_
int getCurrentEntryIndex() const
::FILE * fopen(const char *filename, const char *mode)
std::vector< size_t > cumulative_sizes_
import_export::CopyParams copy_params_
size_t get_data_size(size_t file_size, size_t header_size)
CompressedFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t readRegion(void *buffer, size_t offset, size_t size) override
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
MultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
void throw_removed_row_error(const std::string &file_path)
void insertFile(std::string location)
void throw_removed_file_error(const std::string &file_path)
ImportHeaderRow has_header
bool is_compressed_file(const std::string &location)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
bool isRemainingSizeKnown() override
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
bool currentEntryFinished() const
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
size_t readRegion(void *buffer, size_t offset, size_t size) override
void skipBytes(size_t n_bytes)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
std::unique_ptr< Archive > arch_
const void * current_block_
bool isScanFinished() override
std::vector< std::unique_ptr< CsvReader > > files_
SingleFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t read(void *buffer, size_t max_size) override
void skipToEntry(int entry_number)
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
bool g_enable_watchdog false
bool isScanFinished() override
size_t currentEntryDataAvailable() const
bool isScanFinished() override
size_t getRemainingSize() override
std::vector< std::string > file_locations_
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
std::vector< size_t > cumulative_sizes_
size_t read(void *buffer, size_t max_size) override
size_t block_chars_remaining_
size_t file_size(const int fd)
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override