OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileReader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "FsiJsonUtils.h"
21 #include "Shared/file_path_util.h"
22 #include "Shared/file_type.h"
23 #include "Shared/misc.h"
24 
25 namespace foreign_storage {
26 
27 namespace {
37 void adjust_eof(size_t& read_size,
38  const size_t buffer_size,
39  char* buffer,
40  const char line_delim) {
41  if (read_size == 0 || buffer[read_size - 1] != line_delim) {
42  CHECK(buffer_size > read_size);
43  static_cast<char*>(buffer)[read_size] = line_delim;
44  read_size++;
45  } else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
46  // Extra newline may have been due to the file encoding
47  // and may disappear during an append
48  read_size--;
49  }
50 }
51 
57 size_t offset_to_index(const std::vector<size_t>& cumulative_sizes, size_t byte_offset) {
58  auto iterator =
59  std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
60  if (iterator == cumulative_sizes.end()) {
61  throw std::runtime_error{"Invalid offset into cumulative_sizes"};
62  }
63  return iterator - cumulative_sizes.begin();
64 }
65 
66 size_t get_data_size(size_t file_size, size_t header_size) {
67  // Add 1 byte for possible need to insert a newline
68  return file_size - header_size + 1;
69 }
70 
71 } // namespace
72 
73 SingleFileReader::SingleFileReader(const std::string& file_path,
74  const import_export::CopyParams& copy_params)
75  : FileReader(file_path, copy_params) {}
76 
78  return {{file_path_, getFirstLine()}};
79 }
80 
82  return isScanFinished();
83 }
84 
85 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
86  const import_export::CopyParams& copy_params)
87  : SingleFileReader(file_path, copy_params)
88  , scan_finished_(false)
89  , header_offset_(0)
90  , total_bytes_read_(0) {
91  file_ = fopen(file_path.c_str(), "rb");
92  if (!file_) {
93  throw std::runtime_error{"An error occurred when attempting to open file \"" +
94  file_path + "\". " + strerror(errno)};
95  }
96 
97  // Skip header and record offset
98  skipHeader();
99  fseek(file_, 0, SEEK_END);
100 
102  // Empty file
103  if (data_size_ == 0) {
104  scan_finished_ = true;
105  }
106 
107  if (fseek(file_, static_cast<long int>(header_offset_), SEEK_SET) != 0) {
108  throw std::runtime_error{"An error occurred when attempting to open file \"" +
109  file_path + "\". " + strerror(errno)};
110  };
111 }
112 
113 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
114  const import_export::CopyParams& copy_params,
115  const rapidjson::Value& value)
116  : SingleFileReader(file_path, copy_params)
117  , scan_finished_(true)
118  , header_offset_(0)
119  , total_bytes_read_(0) {
120  file_ = fopen(file_path.c_str(), "rb");
121  if (!file_) {
122  throw std::runtime_error{"An error occurred when attempting to open file \"" +
123  file_path + "\". " + strerror(errno)};
124  }
125  json_utils::get_value_from_object(value, header_offset_, "header_offset");
126  json_utils::get_value_from_object(value, total_bytes_read_, "total_bytes_read");
127  json_utils::get_value_from_object(value, data_size_, "data_size");
128 }
129 
131  rapidjson::Value& value,
132  rapidjson::Document::AllocatorType& allocator) const {
134  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
136  value, total_bytes_read_, "total_bytes_read", allocator);
137  json_utils::add_value_to_object(value, data_size_, "data_size", allocator);
138 }
139 
141  const ForeignServer* server_options,
142  const UserMapping* user_mapping) {
144  // Re-open file and check if there is any new data in it
145  fclose(file_);
146  file_ = fopen(file_path_.c_str(), "rb");
147  if (!file_) {
148  throw std::runtime_error{"An error occurred when attempting to open file \"" +
149  file_path_ + "\". " + strerror(errno)};
150  }
151  fseek(file_, 0, SEEK_END);
152  size_t new_file_size = ftell(file_);
153  size_t new_data_size = get_data_size(new_file_size, header_offset_);
154  if (new_data_size < data_size_) {
156  }
157  if (fseek(file_, static_cast<long int>(file_offset + header_offset_), SEEK_SET) != 0) {
158  throw std::runtime_error{"An error occurred when attempting to read offset " +
159  std::to_string(file_offset + header_offset_) +
160  " in file: \"" + file_path_ + "\". " + strerror(errno)};
161  }
162  if (new_data_size > data_size_) {
163  scan_finished_ = false;
164  total_bytes_read_ = file_offset;
165  data_size_ = new_data_size;
166  }
167 }
168 
171  header_offset_ = getFirstLine().length() + 1;
172  }
173 }
174 
176  std::ifstream file{file_path_};
177  CHECK(file.good());
178  std::string line;
179  std::getline(file, line, copy_params_.line_delim);
180  file.close();
181  return line;
182 }
183 
187 void ArchiveWrapper::skipToEntry(int entry_number) {
188  if (current_entry_ >= entry_number) {
189  resetArchive();
190  }
191  while (current_entry_ < entry_number) {
192  if (arch_.get()->read_next_header()) {
193  current_entry_++;
194  } else {
195  throw std::runtime_error{"Invalid archive entry"};
196  }
197  }
198  fetchBlock();
199 }
200 
201 // Go to next consecutive entry
203  bool success = arch_.get()->read_next_header();
204  if (success) {
205  current_entry_++;
206  fetchBlock();
207  }
208  return success;
209 }
210 
211 void ArchiveWrapper::consumeDataFromCurrentEntry(size_t size, char* dest_buffer) {
212  CHECK(size <= block_chars_remaining_);
213  block_chars_remaining_ -= size;
214  if (dest_buffer != nullptr) {
215  memcpy(dest_buffer, current_block_, size);
216  }
217  current_block_ = static_cast<const char*>(current_block_) + size;
218  if (block_chars_remaining_ == 0) {
219  fetchBlock();
220  }
221 }
222 
223 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
224  CHECK(block_chars_remaining_ > 0);
225  return static_cast<const char*>(current_block_)[0];
226 }
227 
229  arch_.reset(new PosixFileArchive(file_path_, false));
231  // We will increment to 0 when reading first entry
232  current_entry_ = -1;
233 }
234 
236  int64_t offset;
237  auto ok =
238  arch_.get()->read_data_block(&current_block_, &block_chars_remaining_, &offset);
239  if (!ok) {
241  }
242 }
243 
244 CompressedFileReader::CompressedFileReader(const std::string& file_path,
245  const import_export::CopyParams& copy_params)
246  : SingleFileReader(file_path, copy_params)
247  , archive_(file_path)
248  , initial_scan_(true)
249  , scan_finished_(false)
250  , current_offset_(0)
251  , current_index_(-1) {
252  // Initialize first entry
253  nextEntry();
254 }
255 
256 CompressedFileReader::CompressedFileReader(const std::string& file_path,
257  const import_export::CopyParams& copy_params,
258  const rapidjson::Value& value)
259  : CompressedFileReader(file_path, copy_params) {
260  scan_finished_ = true;
261  initial_scan_ = false;
262  sourcenames_.clear();
263  archive_entry_index_.clear();
264  cumulative_sizes_.clear();
265  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
266  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
267  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
268 }
269 
271  size_t read_size,
272  size_t buffer_size) {
273  size_t remaining_size = read_size;
274  char* dest = static_cast<char*>(buffer);
275  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
276  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
278  : remaining_size;
279  // copy data into dest
280  archive_.consumeDataFromCurrentEntry(copy_size, dest);
281  remaining_size -= copy_size;
282  dest += copy_size;
283  }
284  size_t bytes_read = read_size - remaining_size;
285  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
286  adjust_eof(
287  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
288  current_offset_ += bytes_read;
289  nextEntry();
290  } else {
291  current_offset_ += bytes_read;
292  }
293  return bytes_read;
294 }
295 
296 size_t CompressedFileReader::read(void* buffer, size_t max_size) {
297  // Leave one extra char in case we need to insert a delimiter
298  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
299  return bytes_read;
300 }
301 
302 size_t CompressedFileReader::readRegion(void* buffer, size_t offset, size_t size) {
304 
305  // Determine where in the archive we are
306  size_t index = offset_to_index(cumulative_sizes_, offset);
307  CHECK(archive_entry_index_.size() > index);
308  auto archive_entry = archive_entry_index_[index];
309  current_index_ = static_cast<int>(index);
310 
311  // If we are in the wrong entry or too far in the right one skip to the correct entry
312  if (archive_entry != archive_.getCurrentEntryIndex() ||
313  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
314  archive_.skipToEntry(archive_entry);
315  skipHeader();
316  current_offset_ = 0;
317  if (index > 0) {
318  current_offset_ = cumulative_sizes_[index - 1];
319  }
320  }
321  skipBytes(offset - current_offset_);
322  return readInternal(buffer, size, size);
323 }
324 
329  do {
330  // Go to the next index
331  current_index_++;
332  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
334  }
335  if (!initial_scan_) {
336  // Entry # in the archive is known and might not be the next one in the file
337  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
339  skipHeader();
340  } else {
341  scan_finished_ = true;
342  return;
343  }
344  } else {
345  // Read next header in archive and save the sourcename
346  if (archive_.nextEntry()) {
347  // read headers until one has data
348  CHECK(sourcenames_.size() == archive_entry_index_.size());
349  sourcenames_.emplace_back(archive_.entryName());
351  skipHeader();
352  } else {
353  scan_finished_ = true;
354  initial_scan_ = false;
355  return;
356  }
357  }
358  } while (archive_.currentEntryFinished());
359 }
360 
366  std::optional<std::string> str = std::nullopt;
367  consumeFirstLine(str);
368  }
369 }
370 
373  auto first_line = std::make_optional<std::string>();
374  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
375  reader.consumeFirstLine(first_line);
376  return first_line.value();
377 }
378 
379 void CompressedFileReader::consumeFirstLine(std::optional<std::string>& dest_str) {
380  char* dest_buffer = nullptr;
381  while (!archive_.currentEntryFinished()) {
382  if (dest_str.has_value()) {
383  auto& str = dest_str.value();
384  str.resize(str.length() + 1);
385  dest_buffer = str.data() + str.length() - 1;
386  }
388  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
389  break;
390  }
391  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
392  }
393 }
394 
399 void CompressedFileReader::skipBytes(size_t n_bytes) {
400  current_offset_ += n_bytes;
401  while (n_bytes > 0) {
403  // We've reached the end of the entry
404  return;
405  }
406  // Keep fetching blocks/entries until we've gone through N bytes
407  if (archive_.currentEntryDataAvailable() <= n_bytes) {
408  n_bytes -= archive_.currentEntryDataAvailable();
410  } else {
412  n_bytes = 0;
413  }
414  }
415 }
416 
418  const ForeignServer* server_options,
419  const UserMapping* user_mapping) {
420  CHECK(initial_scan_ == false);
421  size_t initial_entries = archive_entry_index_.size();
422 
423  // Reset all entry indexes for existing items
424  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
425  archive_entry_index_[index] = -1;
426  }
427 
428  // Read headers and determine location of existing and new files
429  int entry_number = 0;
431  while (archive_.nextEntry()) {
432  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
433  if (it != sourcenames_.end()) {
434  // Record new index of already read file
435  auto index = it - sourcenames_.begin();
436  archive_entry_index_[index] = entry_number;
437  } else {
438  // Append new source file
439  sourcenames_.emplace_back(archive_.entryName());
440  archive_entry_index_.emplace_back(entry_number);
441  }
442  entry_number++;
443  }
444 
445  // Error if we are missing a file from a previous scan
446  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
447  if (archive_entry_index_[index] == -1) {
448  throw std::runtime_error{
449  "Foreign table refreshed with APPEND mode missing archive entry \"" +
450  sourcenames_[index] + "\" from file \"" +
451  boost::filesystem::path(file_path_).filename().string() + "\"."};
452  }
453  }
454 
456  if (initial_entries < archive_entry_index_.size()) {
457  // We found more files
458  current_index_ = static_cast<int>(initial_entries) - 1;
460  // iterate through new entries until we get one with data
461  do {
462  nextEntry();
463  } while (archive_.currentEntryFinished() &&
464  current_index_ < static_cast<int>(archive_entry_index_.size()));
465 
467  scan_finished_ = false;
468  }
469  } else {
470  // No new files but this may be an archive of a single file
471  // Check if we only have one file and check if it has more data
472  // May have still have multiple entries with some empty that are ignored
473  // like directories
474  size_t last_size = 0;
475  size_t file_index = -1;
476  size_t num_file_entries = 0;
477  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
478  if (cumulative_sizes_[index] > last_size) {
479  file_index = index;
480  num_file_entries++;
481  last_size = cumulative_sizes_[index];
482  }
483  }
484  if (num_file_entries == 1) {
485  current_index_ = static_cast<int>(file_index);
486  current_offset_ = 0;
487  size_t last_eof = cumulative_sizes_[file_index];
488 
489  // reset cumulative_sizes_ with initial zero sizes
490  auto old_cumulative_sizes = std::move(cumulative_sizes_);
491  cumulative_sizes_ = {};
492  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
493  cumulative_sizes_.emplace_back(0);
494  }
495 
496  // Go to Index of file and read to where we left off
498  skipHeader();
499  skipBytes(last_eof);
501  scan_finished_ = false;
502  } else {
503  // There was no new data, so put back the old data structure
504  cumulative_sizes_ = std::move(old_cumulative_sizes);
505  }
506  }
507  }
508 };
509 
511  rapidjson::Value& value,
512  rapidjson::Document::AllocatorType& allocator) const {
513  // Should be done initial scan
516 
517  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
519  value, cumulative_sizes_, "cumulative_sizes", allocator);
521  value, archive_entry_index_, "archive_entry_index", allocator);
522 };
523 
524 MultiFileReader::MultiFileReader(const std::string& file_path,
525  const import_export::CopyParams& copy_params)
526  : FileReader(file_path, copy_params)
527  , current_index_(0)
528  , current_offset_(0)
529  , is_end_of_last_file_(false) {}
530 
531 MultiFileReader::MultiFileReader(const std::string& file_path,
532  const import_export::CopyParams& copy_params,
533  const rapidjson::Value& value)
534  : FileReader(file_path, copy_params)
535  , current_index_(0)
536  , current_offset_(0)
537  , is_end_of_last_file_(false) {
538  json_utils::get_value_from_object(value, file_locations_, "file_locations");
539  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
540  json_utils::get_value_from_object(value, current_offset_, "current_offset");
541  json_utils::get_value_from_object(value, current_index_, "current_index");
542 
543  // Validate files_metadata here, but objects will be recreated by child class
544  CHECK(value.HasMember("files_metadata"));
545  CHECK(value["files_metadata"].IsArray());
546  CHECK(file_locations_.size() == value["files_metadata"].GetArray().Size());
547 }
548 
549 void MultiFileReader::serialize(rapidjson::Value& value,
550  rapidjson::Document::AllocatorType& allocator) const {
551  json_utils::add_value_to_object(value, file_locations_, "file_locations", allocator);
553  value, cumulative_sizes_, "cumulative_sizes", allocator);
554  json_utils::add_value_to_object(value, current_offset_, "current_offset", allocator);
555  json_utils::add_value_to_object(value, current_index_, "current_index", allocator);
556 
557  // Serialize metadata from all files
558  rapidjson::Value files_metadata(rapidjson::kArrayType);
559  for (size_t index = 0; index < files_.size(); index++) {
560  rapidjson::Value file_metadata(rapidjson::kObjectType);
561  files_[index]->serialize(file_metadata, allocator);
562  files_metadata.PushBack(file_metadata, allocator);
563  }
564  value.AddMember("files_metadata", files_metadata, allocator);
565 };
566 
568  size_t total_size = 0;
569  for (size_t index = current_index_; index < files_.size(); index++) {
570  total_size += files_[index]->getRemainingSize();
571  }
572  return total_size;
573 }
574 
576  bool size_known = true;
577  for (size_t index = current_index_; index < files_.size(); index++) {
578  size_known = size_known && files_[index]->isRemainingSizeKnown();
579  }
580  return size_known;
581 }
582 
584  FirstLineByFilePath first_line_by_file_path;
585  for (const auto& file : files_) {
586  first_line_by_file_path.merge(file->getFirstLineForEachFile());
587  }
588  return first_line_by_file_path;
589 }
590 
592  return (isScanFinished() || is_end_of_last_file_);
593 }
594 
596  const std::string& file_path,
597  const import_export::CopyParams& copy_params,
598  const std::optional<std::string>& regex_path_filter,
599  const std::optional<std::string>& file_sort_order_by,
600  const std::optional<std::string>& file_sort_regex)
601  : MultiFileReader(file_path, copy_params) {
602  auto found_file_locations = shared::local_glob_filter_sort_files(
603  file_path, regex_path_filter, file_sort_order_by, file_sort_regex);
604  for (const auto& location : found_file_locations) {
605  insertFile(location);
606  }
607 }
608 
609 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
610  const import_export::CopyParams& copy_params,
611  const rapidjson::Value& value)
612  : MultiFileReader(file_path, copy_params, value) {
613  // Constructs file from files_metadata
614  for (size_t index = 0; index < file_locations_.size(); index++) {
616  files_.emplace_back(std::make_unique<CompressedFileReader>(
617  file_locations_[index],
618  copy_params_,
619  value["files_metadata"].GetArray()[index]));
620  } else {
621  files_.emplace_back(std::make_unique<SingleTextFileReader>(
622  file_locations_[index],
623  copy_params_,
624  value["files_metadata"].GetArray()[index]));
625  }
626  }
627 }
628 
629 void LocalMultiFileReader::insertFile(std::string location) {
630  if (shared::is_compressed_file_extension(location)) {
631  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
632  } else {
633  files_.emplace_back(std::make_unique<SingleTextFileReader>(location, copy_params_));
634  }
635  if (files_.back()->isScanFinished()) {
636  // skip any initially empty files
637  files_.pop_back();
638  } else {
639  file_locations_.push_back(location);
640  }
641 }
642 
644  const ForeignServer* server_options,
645  const UserMapping* user_mapping) {
646  // Look for new files
647  std::set<std::string> new_locations;
649  CHECK(file_offset == current_offset_);
650  if (boost::filesystem::is_directory(file_path_)) {
651  // Find all files in this directory
652  std::set<std::string> all_file_paths;
653  for (boost::filesystem::recursive_directory_iterator
654  it(file_path_, boost::filesystem::symlink_option::recurse),
655  eit;
656  it != eit;
657  ++it) {
658  bool new_file =
659  std::find(file_locations_.begin(), file_locations_.end(), it->path()) ==
660  file_locations_.end();
661  if (!boost::filesystem::is_directory(it->path()) && new_file) {
662  new_locations.insert(it->path().string());
663  }
664  all_file_paths.emplace(it->path().string());
665  }
666 
667  for (const auto& file_path : file_locations_) {
668  if (all_file_paths.find(file_path) == all_file_paths.end()) {
669  throw_removed_file_error(file_path);
670  }
671  }
672  }
673  if (new_locations.size() > 0) {
674  for (const auto& location : new_locations) {
675  insertFile(location);
676  }
677  } else if (files_.size() == 1) {
678  // Single file, check if it has new data
679  files_[0].get()->checkForMoreRows(file_offset);
680  if (!files_[0].get()->isScanFinished()) {
681  current_index_ = 0;
682  cumulative_sizes_ = {};
683  }
684  }
685 }
686 
687 size_t MultiFileReader::read(void* buffer, size_t max_size) {
688  if (isScanFinished()) {
689  return 0;
690  }
691  // Leave one extra char in case we need to insert a delimiter
692  size_t bytes_read = files_[current_index_].get()->read(buffer, max_size - 1);
693  if (files_[current_index_].get()->isScanFinished()) {
694  adjust_eof(bytes_read, max_size, static_cast<char*>(buffer), copy_params_.line_delim);
695  }
696  current_offset_ += bytes_read;
697  if (current_index_ < files_.size() && files_[current_index_].get()->isScanFinished()) {
699  current_index_++;
700  is_end_of_last_file_ = true;
701  } else {
702  is_end_of_last_file_ = false;
703  }
704  return bytes_read;
705 }
706 
707 size_t MultiFileReader::readRegion(void* buffer, size_t offset, size_t size) {
709  // Get file index
710  auto index = offset_to_index(cumulative_sizes_, offset);
711  // Get offset into this file
712  size_t base = 0;
713  if (index > 0) {
714  base = cumulative_sizes_[index - 1];
715  }
716 
717  size_t read_size = size;
718  if (offset + size == cumulative_sizes_[index]) {
719  // Skip the last byte as it may have been an inserted delimiter
720  read_size--;
721  }
722  size_t bytes_read = files_[index].get()->readRegion(buffer, offset - base, read_size);
723 
724  if (offset + size == cumulative_sizes_[index]) {
725  // Re-insert delimiter
726  static_cast<char*>(buffer)[size - 1] = copy_params_.line_delim;
727  bytes_read++;
728  }
729 
730  return bytes_read;
731 }
732 
733 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
virtual std::string getFirstLine() const =0
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:57
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string getFirstLine() const override
Definition: FileReader.cpp:175
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
void throw_removed_row_in_file_error(const std::string &file_path)
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:244
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:77
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:302
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:85
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:270
std::map< std::string, std::string > FirstLineByFilePath
Definition: FileReader.h:33
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:524
std::string to_string(char const *&&v)
void insertFile(std::string location)
Definition: FileReader.cpp:629
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
Definition: FileReader.cpp:595
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void throw_removed_file_error(const std::string &file_path)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:130
ImportHeaderRow has_header
Definition: CopyParams.h:46
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:510
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:211
bool isRemainingSizeKnown() override
Definition: FileReader.cpp:575
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:583
::FILE * fopen(const char *filename, const char *mode)
Definition: omnisci_fs.cpp:72
bool currentEntryFinished() const
Definition: FileReader.h:220
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:707
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:379
virtual bool isScanFinished()=0
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
size_t get_data_size(size_t file_size, size_t header_size)
Definition: FileReader.cpp:66
std::unique_ptr< Archive > arch_
Definition: FileReader.h:243
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex, const bool recurse)
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:137
tuple line
Definition: parse_ast.py:10
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:37
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:73
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:687
void skipToEntry(int entry_number)
Definition: FileReader.cpp:187
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:417
shared utility for mime-types
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:223
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:140
std::string getFirstLine() const override
Definition: FileReader.cpp:371
size_t getRemainingSize() override
Definition: FileReader.cpp:567
std::vector< std::string > file_locations_
Definition: FileReader.h:358
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:549
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
bool is_compressed_file_extension(const std::string &location)
Definition: file_type.cpp:50
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:361
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:296
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:643