OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileReader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "FsiJsonUtils.h"
21 #include "Shared/file_path_util.h"
22 #include "Shared/misc.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
36 void adjust_eof(size_t& read_size,
37  const size_t buffer_size,
38  char* buffer,
39  const char line_delim) {
40  if (read_size == 0 || buffer[read_size - 1] != line_delim) {
41  CHECK(buffer_size > read_size);
42  static_cast<char*>(buffer)[read_size] = line_delim;
43  read_size++;
44  } else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
45  // Extra newline may have been due to the file encoding
46  // and may disappear during an append
47  read_size--;
48  }
49 }
50 
56 size_t offset_to_index(const std::vector<size_t>& cumulative_sizes, size_t byte_offset) {
57  auto iterator =
58  std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
59  if (iterator == cumulative_sizes.end()) {
60  throw std::runtime_error{"Invalid offset into cumulative_sizes"};
61  }
62  return iterator - cumulative_sizes.begin();
63 }
64 
65 size_t get_data_size(size_t file_size, size_t header_size) {
66  // Add 1 byte for possible need to insert a newline
67  return file_size - header_size + 1;
68 }
69 
70 } // namespace
71 
72 SingleFileReader::SingleFileReader(const std::string& file_path,
73  const import_export::CopyParams& copy_params)
74  : FileReader(file_path, copy_params) {}
75 
77  return {{file_path_, getFirstLine()}};
78 }
79 
81  return isScanFinished();
82 }
83 
84 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
85  const import_export::CopyParams& copy_params)
86  : SingleFileReader(file_path, copy_params)
87  , scan_finished_(false)
88  , header_offset_(0)
89  , total_bytes_read_(0) {
90  file_ = fopen(file_path.c_str(), "rb");
91  if (!file_) {
92  throw std::runtime_error{"An error occurred when attempting to open file \"" +
93  file_path + "\". " + strerror(errno)};
94  }
95 
96  // Skip header and record offset
97  skipHeader();
98  fseek(file_, 0, SEEK_END);
99 
101 
102  if (fseek(file_, static_cast<long int>(header_offset_), SEEK_SET) != 0) {
103  throw std::runtime_error{"An error occurred when attempting to open file \"" +
104  file_path + "\". " + strerror(errno)};
105  };
106 }
107 
108 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
109  const import_export::CopyParams& copy_params,
110  const rapidjson::Value& value)
111  : SingleFileReader(file_path, copy_params)
112  , scan_finished_(true)
113  , header_offset_(0)
114  , total_bytes_read_(0) {
115  file_ = fopen(file_path.c_str(), "rb");
116  if (!file_) {
117  throw std::runtime_error{"An error occurred when attempting to open file \"" +
118  file_path + "\". " + strerror(errno)};
119  }
120  json_utils::get_value_from_object(value, header_offset_, "header_offset");
121  json_utils::get_value_from_object(value, total_bytes_read_, "total_bytes_read");
122  json_utils::get_value_from_object(value, data_size_, "data_size");
123 }
124 
126  rapidjson::Value& value,
127  rapidjson::Document::AllocatorType& allocator) const {
129  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
131  value, total_bytes_read_, "total_bytes_read", allocator);
132  json_utils::add_value_to_object(value, data_size_, "data_size", allocator);
133 }
134 
136  const ForeignServer* server_options,
137  const UserMapping* user_mapping) {
139  // Re-open file and check if there is any new data in it
140  fclose(file_);
141  file_ = fopen(file_path_.c_str(), "rb");
142  if (!file_) {
143  throw std::runtime_error{"An error occurred when attempting to open file \"" +
144  file_path_ + "\". " + strerror(errno)};
145  }
146  fseek(file_, 0, SEEK_END);
147  size_t new_file_size = ftell(file_);
148  size_t new_data_size = get_data_size(new_file_size, header_offset_);
149  if (new_data_size < data_size_) {
151  }
152  if (fseek(file_, static_cast<long int>(file_offset + header_offset_), SEEK_SET) != 0) {
153  throw std::runtime_error{"An error occurred when attempting to read offset " +
154  std::to_string(file_offset + header_offset_) +
155  " in file: \"" + file_path_ + "\". " + strerror(errno)};
156  }
157  if (new_data_size > data_size_) {
158  scan_finished_ = false;
159  total_bytes_read_ = file_offset;
160  data_size_ = new_data_size;
161  }
162 }
163 
166  header_offset_ = getFirstLine().length() + 1;
167  }
168 }
169 
171  std::ifstream file{file_path_};
172  CHECK(file.good());
173  std::string line;
174  std::getline(file, line, copy_params_.line_delim);
175  file.close();
176  return line;
177 }
178 
182 void ArchiveWrapper::skipToEntry(int entry_number) {
183  if (current_entry_ >= entry_number) {
184  resetArchive();
185  }
186  while (current_entry_ < entry_number) {
187  if (arch_.get()->read_next_header()) {
188  current_entry_++;
189  } else {
190  throw std::runtime_error{"Invalid archive entry"};
191  }
192  }
193  fetchBlock();
194 }
195 
196 // Go to next consecutive entry
198  bool success = arch_.get()->read_next_header();
199  if (success) {
200  current_entry_++;
201  fetchBlock();
202  }
203  return success;
204 }
205 
206 void ArchiveWrapper::consumeDataFromCurrentEntry(size_t size, char* dest_buffer) {
207  CHECK(size <= block_chars_remaining_);
208  block_chars_remaining_ -= size;
209  if (dest_buffer != nullptr) {
210  memcpy(dest_buffer, current_block_, size);
211  }
212  current_block_ = static_cast<const char*>(current_block_) + size;
213  if (block_chars_remaining_ == 0) {
214  fetchBlock();
215  }
216 }
217 
218 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
219  CHECK(block_chars_remaining_ > 0);
220  return static_cast<const char*>(current_block_)[0];
221 }
222 
224  arch_.reset(new PosixFileArchive(file_path_, false));
226  // We will increment to 0 when reading first entry
227  current_entry_ = -1;
228 }
229 
231  int64_t offset;
232  auto ok =
233  arch_.get()->read_data_block(&current_block_, &block_chars_remaining_, &offset);
234  if (!ok) {
236  }
237 }
238 
239 CompressedFileReader::CompressedFileReader(const std::string& file_path,
240  const import_export::CopyParams& copy_params)
241  : SingleFileReader(file_path, copy_params)
242  , archive_(file_path)
243  , initial_scan_(true)
244  , scan_finished_(false)
245  , current_offset_(0)
246  , current_index_(-1) {
247  // Initialize first entry
248  nextEntry();
249 }
250 
251 CompressedFileReader::CompressedFileReader(const std::string& file_path,
252  const import_export::CopyParams& copy_params,
253  const rapidjson::Value& value)
254  : CompressedFileReader(file_path, copy_params) {
255  scan_finished_ = true;
256  initial_scan_ = false;
257  sourcenames_.clear();
258  archive_entry_index_.clear();
259  cumulative_sizes_.clear();
260  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
261  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
262  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
263 }
264 
266  size_t read_size,
267  size_t buffer_size) {
268  size_t remaining_size = read_size;
269  char* dest = static_cast<char*>(buffer);
270  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
271  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
273  : remaining_size;
274  // copy data into dest
275  archive_.consumeDataFromCurrentEntry(copy_size, dest);
276  remaining_size -= copy_size;
277  dest += copy_size;
278  }
279  size_t bytes_read = read_size - remaining_size;
280  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
281  adjust_eof(
282  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
283  current_offset_ += bytes_read;
284  nextEntry();
285  } else {
286  current_offset_ += bytes_read;
287  }
288  return bytes_read;
289 }
290 
291 size_t CompressedFileReader::read(void* buffer, size_t max_size) {
292  // Leave one extra char in case we need to insert a delimiter
293  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
294  return bytes_read;
295 }
296 
297 size_t CompressedFileReader::readRegion(void* buffer, size_t offset, size_t size) {
299 
300  // Determine where in the archive we are
301  size_t index = offset_to_index(cumulative_sizes_, offset);
302  CHECK(archive_entry_index_.size() > index);
303  auto archive_entry = archive_entry_index_[index];
304  current_index_ = static_cast<int>(index);
305 
306  // If we are in the wrong entry or too far in the right one skip to the correct entry
307  if (archive_entry != archive_.getCurrentEntryIndex() ||
308  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
309  archive_.skipToEntry(archive_entry);
310  skipHeader();
311  current_offset_ = 0;
312  if (index > 0) {
313  current_offset_ = cumulative_sizes_[index - 1];
314  }
315  }
316  skipBytes(offset - current_offset_);
317  return readInternal(buffer, size, size);
318 }
319 
324  do {
325  // Go to the next index
326  current_index_++;
327  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
329  }
330  if (!initial_scan_) {
331  // Entry # in the archive is known and might not be the next one in the file
332  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
334  skipHeader();
335  } else {
336  scan_finished_ = true;
337  return;
338  }
339  } else {
340  // Read next header in archive and save the sourcename
341  if (archive_.nextEntry()) {
342  // read headers until one has data
343  CHECK(sourcenames_.size() == archive_entry_index_.size());
344  sourcenames_.emplace_back(archive_.entryName());
346  skipHeader();
347  } else {
348  scan_finished_ = true;
349  initial_scan_ = false;
350  return;
351  }
352  }
353  } while (archive_.currentEntryFinished());
354 }
355 
361  std::optional<std::string> str = std::nullopt;
362  consumeFirstLine(str);
363  }
364 }
365 
368  auto first_line = std::make_optional<std::string>();
369  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
370  reader.consumeFirstLine(first_line);
371  return first_line.value();
372 }
373 
374 void CompressedFileReader::consumeFirstLine(std::optional<std::string>& dest_str) {
375  char* dest_buffer = nullptr;
376  while (!archive_.currentEntryFinished()) {
377  if (dest_str.has_value()) {
378  auto& str = dest_str.value();
379  str.resize(str.length() + 1);
380  dest_buffer = str.data() + str.length() - 1;
381  }
383  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
384  break;
385  }
386  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
387  }
388 }
389 
394 void CompressedFileReader::skipBytes(size_t n_bytes) {
395  current_offset_ += n_bytes;
396  while (n_bytes > 0) {
398  // We've reached the end of the entry
399  return;
400  }
401  // Keep fetching blocks/entries until we've gone through N bytes
402  if (archive_.currentEntryDataAvailable() <= n_bytes) {
403  n_bytes -= archive_.currentEntryDataAvailable();
405  } else {
407  n_bytes = 0;
408  }
409  }
410 }
411 
413  const ForeignServer* server_options,
414  const UserMapping* user_mapping) {
415  CHECK(initial_scan_ == false);
416  size_t initial_entries = archive_entry_index_.size();
417 
418  // Reset all entry indexes for existing items
419  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
420  archive_entry_index_[index] = -1;
421  }
422 
423  // Read headers and determine location of existing and new files
424  int entry_number = 0;
426  while (archive_.nextEntry()) {
427  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
428  if (it != sourcenames_.end()) {
429  // Record new index of already read file
430  auto index = it - sourcenames_.begin();
431  archive_entry_index_[index] = entry_number;
432  } else {
433  // Append new source file
434  sourcenames_.emplace_back(archive_.entryName());
435  archive_entry_index_.emplace_back(entry_number);
436  }
437  entry_number++;
438  }
439 
440  // Error if we are missing a file from a previous scan
441  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
442  if (archive_entry_index_[index] == -1) {
443  throw std::runtime_error{
444  "Foreign table refreshed with APPEND mode missing archive entry \"" +
445  sourcenames_[index] + "\" from file \"" +
446  boost::filesystem::path(file_path_).filename().string() + "\"."};
447  }
448  }
449 
451  if (initial_entries < archive_entry_index_.size()) {
452  // We found more files
453  current_index_ = static_cast<int>(initial_entries) - 1;
455  // iterate through new entries until we get one with data
456  do {
457  nextEntry();
458  } while (archive_.currentEntryFinished() &&
459  current_index_ < static_cast<int>(archive_entry_index_.size()));
460 
462  scan_finished_ = false;
463  }
464  } else {
465  // No new files but this may be an archive of a single file
466  // Check if we only have one file and check if it has more data
467  // May have still have multiple entries with some empty that are ignored
468  // like directories
469  size_t last_size = 0;
470  size_t file_index = -1;
471  size_t num_file_entries = 0;
472  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
473  if (cumulative_sizes_[index] > last_size) {
474  file_index = index;
475  num_file_entries++;
476  last_size = cumulative_sizes_[index];
477  }
478  }
479  if (num_file_entries == 1) {
480  current_index_ = static_cast<int>(file_index);
481  current_offset_ = 0;
482  size_t last_eof = cumulative_sizes_[file_index];
483 
484  // reset cumulative_sizes_ with initial zero sizes
485  auto old_cumulative_sizes = std::move(cumulative_sizes_);
486  cumulative_sizes_ = {};
487  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
488  cumulative_sizes_.emplace_back(0);
489  }
490 
491  // Go to Index of file and read to where we left off
493  skipHeader();
494  skipBytes(last_eof);
496  scan_finished_ = false;
497  } else {
498  // There was no new data, so put back the old data structure
499  cumulative_sizes_ = std::move(old_cumulative_sizes);
500  }
501  }
502  }
503 };
504 
506  rapidjson::Value& value,
507  rapidjson::Document::AllocatorType& allocator) const {
508  // Should be done initial scan
511 
512  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
514  value, cumulative_sizes_, "cumulative_sizes", allocator);
516  value, archive_entry_index_, "archive_entry_index", allocator);
517 };
518 
519 MultiFileReader::MultiFileReader(const std::string& file_path,
520  const import_export::CopyParams& copy_params)
521  : FileReader(file_path, copy_params)
522  , current_index_(0)
523  , current_offset_(0)
524  , is_end_of_last_file_(false) {}
525 
526 MultiFileReader::MultiFileReader(const std::string& file_path,
527  const import_export::CopyParams& copy_params,
528  const rapidjson::Value& value)
529  : FileReader(file_path, copy_params)
530  , current_index_(0)
531  , current_offset_(0)
532  , is_end_of_last_file_(false) {
533  json_utils::get_value_from_object(value, file_locations_, "file_locations");
534  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
535  json_utils::get_value_from_object(value, current_offset_, "current_offset");
536  json_utils::get_value_from_object(value, current_index_, "current_index");
537 
538  // Validate files_metadata here, but objects will be recreated by child class
539  CHECK(value.HasMember("files_metadata"));
540  CHECK(value["files_metadata"].IsArray());
541  CHECK(file_locations_.size() == value["files_metadata"].GetArray().Size());
542 }
543 
544 void MultiFileReader::serialize(rapidjson::Value& value,
545  rapidjson::Document::AllocatorType& allocator) const {
546  json_utils::add_value_to_object(value, file_locations_, "file_locations", allocator);
548  value, cumulative_sizes_, "cumulative_sizes", allocator);
549  json_utils::add_value_to_object(value, current_offset_, "current_offset", allocator);
550  json_utils::add_value_to_object(value, current_index_, "current_index", allocator);
551 
552  // Serialize metadata from all files
553  rapidjson::Value files_metadata(rapidjson::kArrayType);
554  for (size_t index = 0; index < files_.size(); index++) {
555  rapidjson::Value file_metadata(rapidjson::kObjectType);
556  files_[index]->serialize(file_metadata, allocator);
557  files_metadata.PushBack(file_metadata, allocator);
558  }
559  value.AddMember("files_metadata", files_metadata, allocator);
560 };
561 
563  size_t total_size = 0;
564  for (size_t index = current_index_; index < files_.size(); index++) {
565  total_size += files_[index]->getRemainingSize();
566  }
567  return total_size;
568 }
569 
571  bool size_known = true;
572  for (size_t index = current_index_; index < files_.size(); index++) {
573  size_known = size_known && files_[index]->isRemainingSizeKnown();
574  }
575  return size_known;
576 }
577 
579  FirstLineByFilePath first_line_by_file_path;
580  for (const auto& file : files_) {
581  first_line_by_file_path.merge(file->getFirstLineForEachFile());
582  }
583  return first_line_by_file_path;
584 }
585 
587  return (isScanFinished() || is_end_of_last_file_);
588 }
589 
591  const std::string& file_path,
592  const import_export::CopyParams& copy_params,
593  const std::optional<std::string>& regex_path_filter,
594  const std::optional<std::string>& file_sort_order_by,
595  const std::optional<std::string>& file_sort_regex)
596  : MultiFileReader(file_path, copy_params) {
597  auto found_file_locations = shared::local_glob_filter_sort_files(
598  file_path, regex_path_filter, file_sort_order_by, file_sort_regex);
599  for (const auto& location : found_file_locations) {
600  insertFile(location);
601  }
602 }
603 
604 namespace {
605 bool is_compressed_file(const std::string& location) {
606  const std::vector<std::string> compressed_exts = {
607  ".zip", ".gz", ".tar", ".rar", ".bz2", ".7z", ".tgz"};
608  return shared::contains(compressed_exts, boost::filesystem::extension(location));
609 }
610 } // namespace
611 
612 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
613  const import_export::CopyParams& copy_params,
614  const rapidjson::Value& value)
615  : MultiFileReader(file_path, copy_params, value) {
616  // Constructs file from files_metadata
617  for (size_t index = 0; index < file_locations_.size(); index++) {
618  if (is_compressed_file(file_locations_[index])) {
619  files_.emplace_back(std::make_unique<CompressedFileReader>(
620  file_locations_[index],
621  copy_params_,
622  value["files_metadata"].GetArray()[index]));
623  } else {
624  files_.emplace_back(std::make_unique<SingleTextFileReader>(
625  file_locations_[index],
626  copy_params_,
627  value["files_metadata"].GetArray()[index]));
628  }
629  }
630 }
631 
632 void LocalMultiFileReader::insertFile(std::string location) {
633  if (is_compressed_file(location)) {
634  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
635  } else {
636  files_.emplace_back(std::make_unique<SingleTextFileReader>(location, copy_params_));
637  }
638  if (files_.back()->isScanFinished()) {
639  // skip any initially empty files
640  files_.pop_back();
641  } else {
642  file_locations_.push_back(location);
643  }
644 }
645 
647  const ForeignServer* server_options,
648  const UserMapping* user_mapping) {
649  // Look for new files
650  std::set<std::string> new_locations;
652  CHECK(file_offset == current_offset_);
653  if (boost::filesystem::is_directory(file_path_)) {
654  // Find all files in this directory
655  std::set<std::string> all_file_paths;
656  for (boost::filesystem::recursive_directory_iterator
657  it(file_path_, boost::filesystem::symlink_option::recurse),
658  eit;
659  it != eit;
660  ++it) {
661  bool new_file =
662  std::find(file_locations_.begin(), file_locations_.end(), it->path()) ==
663  file_locations_.end();
664  if (!boost::filesystem::is_directory(it->path()) && new_file) {
665  new_locations.insert(it->path().string());
666  }
667  all_file_paths.emplace(it->path().string());
668  }
669 
670  for (const auto& file_path : file_locations_) {
671  if (all_file_paths.find(file_path) == all_file_paths.end()) {
672  throw_removed_file_error(file_path);
673  }
674  }
675  }
676  if (new_locations.size() > 0) {
677  for (const auto& location : new_locations) {
678  insertFile(location);
679  }
680  } else if (files_.size() == 1) {
681  // Single file, check if it has new data
682  files_[0].get()->checkForMoreRows(file_offset);
683  if (!files_[0].get()->isScanFinished()) {
684  current_index_ = 0;
685  cumulative_sizes_ = {};
686  }
687  }
688 }
689 
690 size_t MultiFileReader::read(void* buffer, size_t max_size) {
691  if (isScanFinished()) {
692  return 0;
693  }
694  // Leave one extra char in case we need to insert a delimiter
695  size_t bytes_read = files_[current_index_].get()->read(buffer, max_size - 1);
696  if (files_[current_index_].get()->isScanFinished()) {
697  adjust_eof(bytes_read, max_size, static_cast<char*>(buffer), copy_params_.line_delim);
698  }
699  current_offset_ += bytes_read;
700  if (current_index_ < files_.size() && files_[current_index_].get()->isScanFinished()) {
702  current_index_++;
703  is_end_of_last_file_ = true;
704  } else {
705  is_end_of_last_file_ = false;
706  }
707  return bytes_read;
708 }
709 
710 size_t MultiFileReader::readRegion(void* buffer, size_t offset, size_t size) {
712  // Get file index
713  auto index = offset_to_index(cumulative_sizes_, offset);
714  // Get offset into this file
715  size_t base = 0;
716  if (index > 0) {
717  base = cumulative_sizes_[index - 1];
718  }
719 
720  size_t read_size = size;
721  if (offset + size == cumulative_sizes_[index]) {
722  // Skip the last byte as it may have been an inserted delimiter
723  read_size--;
724  }
725  size_t bytes_read = files_[index].get()->readRegion(buffer, offset - base, read_size);
726 
727  if (offset + size == cumulative_sizes_[index]) {
728  // Re-insert delimiter
729  static_cast<char*>(buffer)[size - 1] = copy_params_.line_delim;
730  bytes_read++;
731  }
732 
733  return bytes_read;
734 }
735 
736 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
bool contains(const T &container, const U &element)
Definition: misc.h:188
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
virtual std::string getFirstLine() const =0
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:56
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string getFirstLine() const override
Definition: FileReader.cpp:170
::FILE * fopen(const char *filename, const char *mode)
Definition: omnisci_fs.cpp:72
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:239
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:76
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:297
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:84
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:265
std::map< std::string, std::string > FirstLineByFilePath
Definition: FileReader.h:33
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:519
std::string to_string(char const *&&v)
void throw_removed_row_error(const std::string &file_path)
void insertFile(std::string location)
Definition: FileReader.cpp:632
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
Definition: FileReader.cpp:590
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void throw_removed_file_error(const std::string &file_path)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:125
ImportHeaderRow has_header
Definition: CopyParams.h:51
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:505
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:206
bool isRemainingSizeKnown() override
Definition: FileReader.cpp:570
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:578
bool currentEntryFinished() const
Definition: FileReader.h:220
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:710
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:374
virtual bool isScanFinished()=0
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
size_t get_data_size(size_t file_size, size_t header_size)
Definition: FileReader.cpp:65
std::unique_ptr< Archive > arch_
Definition: FileReader.h:243
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:137
tuple line
Definition: parse_ast.py:10
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:36
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:72
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:690
void skipToEntry(int entry_number)
Definition: FileReader.cpp:182
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:412
bool is_compressed_file(const std::string &location)
Definition: FileReader.cpp:605
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:135
std::string getFirstLine() const override
Definition: FileReader.cpp:366
size_t getRemainingSize() override
Definition: FileReader.cpp:562
std::vector< std::string > file_locations_
Definition: FileReader.h:358
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:544
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:361
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:291
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:646