OmniSciDB  2e3a973ef4
CsvReader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 #include "FsiJsonUtils.h"
20 
21 namespace foreign_storage {
22 
23 namespace {
33 void adjust_eof(size_t& read_size,
34  const size_t buffer_size,
35  char* buffer,
36  const char line_delim) {
37  if (read_size == 0 || buffer[read_size - 1] != line_delim) {
38  CHECK(buffer_size > read_size);
39  static_cast<char*>(buffer)[read_size] = line_delim;
40  read_size++;
41  } else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
42  // Extra newline may have been due to the file encoding
43  // and may disappear during an append
44  read_size--;
45  }
46 }
47 
53 size_t offset_to_index(const std::vector<size_t>& cumulative_sizes, size_t byte_offset) {
54  auto iterator =
55  std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
56  if (iterator == cumulative_sizes.end()) {
57  throw std::runtime_error{"Invalid offset into cumulative_sizes"};
58  }
59  return iterator - cumulative_sizes.begin();
60 }
61 
62 size_t get_data_size(size_t file_size, size_t header_size) {
63  // Add 1 byte for possible need to insert a newline
64  return file_size - header_size + 1;
65 }
66 
67 } // namespace
68 
69 SingleFileReader::SingleFileReader(const std::string& file_path,
70  const import_export::CopyParams& copy_params)
71  : CsvReader(file_path, copy_params)
72  , scan_finished_(false)
73  , header_offset_(0)
74  , total_bytes_read_(0) {
75  file_ = fopen(file_path.c_str(), "rb");
76  if (!file_) {
77  throw std::runtime_error{"An error occurred when attempting to open file \"" +
78  file_path + "\". " + strerror(errno)};
79  }
80 
81  // Skip header and record offset
83  std::ifstream file{file_path};
84  CHECK(file.good());
85  std::string line;
86  std::getline(file, line, copy_params.line_delim);
87  file.close();
88  header_offset_ = line.size() + 1;
89  }
90  fseek(file_, 0, SEEK_END);
91 
93 
94  if (fseek(file_, static_cast<long int>(header_offset_), SEEK_SET) != 0) {
95  throw std::runtime_error{"An error occurred when attempting to open file \"" +
96  file_path + "\". " + strerror(errno)};
97  };
98 }
99 
100 SingleFileReader::SingleFileReader(const std::string& file_path,
101  const import_export::CopyParams& copy_params,
102  const rapidjson::Value& value)
103  : CsvReader(file_path, copy_params)
104  , scan_finished_(true)
105  , header_offset_(0)
106  , total_bytes_read_(0) {
107  file_ = fopen(file_path.c_str(), "rb");
108  if (!file_) {
109  throw std::runtime_error{"An error occurred when attempting to open file \"" +
110  file_path + "\". " + strerror(errno)};
111  }
112  json_utils::get_value_from_object(value, header_offset_, "header_offset");
113  json_utils::get_value_from_object(value, total_bytes_read_, "total_bytes_read");
114  json_utils::get_value_from_object(value, data_size_, "data_size");
115 }
116 
117 void SingleFileReader::serialize(rapidjson::Value& value,
118  rapidjson::Document::AllocatorType& allocator) const {
120  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
122  value, total_bytes_read_, "total_bytes_read", allocator);
123  json_utils::add_value_to_object(value, data_size_, "data_size", allocator);
124 };
125 
126 void SingleFileReader::checkForMoreRows(size_t file_offset,
127  const ForeignServer* server_options,
128  const UserMapping* user_mapping) {
130  // Re-open file and check if there is any new data in it
131  fclose(file_);
132  file_ = fopen(file_path_.c_str(), "rb");
133  if (!file_) {
134  throw std::runtime_error{"An error occurred when attempting to open file \"" +
135  file_path_ + "\". " + strerror(errno)};
136  }
137  fseek(file_, 0, SEEK_END);
138  size_t new_file_size = ftell(file_);
139  size_t new_data_size = get_data_size(new_file_size, header_offset_);
140  if (new_data_size < data_size_) {
142  }
143  if (fseek(file_, static_cast<long int>(file_offset + header_offset_), SEEK_SET) != 0) {
144  throw std::runtime_error{"An error occurred when attempting to read offset " +
145  std::to_string(file_offset + header_offset_) +
146  " in file: \"" + file_path_ + "\". " + strerror(errno)};
147  }
148  if (new_data_size > data_size_) {
149  scan_finished_ = false;
150  total_bytes_read_ = file_offset;
151  data_size_ = new_data_size;
152  }
153 }
154 
158 void ArchiveWrapper::skipToEntry(int entry_number) {
159  if (current_entry_ >= entry_number) {
160  resetArchive();
161  }
162  while (current_entry_ < entry_number) {
163  if (arch_.get()->read_next_header()) {
164  current_entry_++;
165  } else {
166  throw std::runtime_error{"Invalid archive entry"};
167  }
168  }
169  fetchBlock();
170 }
171 
172 // Go to next consecutive entry
174  bool success = arch_.get()->read_next_header();
175  if (success) {
176  current_entry_++;
177  fetchBlock();
178  }
179  return success;
180 }
181 
182 void ArchiveWrapper::consumeDataFromCurrentEntry(size_t size, char* dest_buffer) {
183  CHECK(size <= block_chars_remaining_);
184  block_chars_remaining_ -= size;
185  if (dest_buffer != nullptr) {
186  memcpy(dest_buffer, current_block_, size);
187  }
188  current_block_ = static_cast<const char*>(current_block_) + size;
189  if (block_chars_remaining_ == 0) {
190  fetchBlock();
191  }
192 }
193 
194 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
195  CHECK(block_chars_remaining_ > 0);
196  return static_cast<const char*>(current_block_)[0];
197 }
198 
200  arch_.reset(new PosixFileArchive(file_path_, false));
201  block_chars_remaining_ = 0;
202  // We will increment to 0 when reading first entry
203  current_entry_ = -1;
204 }
205 
207  int64_t offset;
208  auto ok =
209  arch_.get()->read_data_block(&current_block_, &block_chars_remaining_, &offset);
210  if (!ok) {
211  block_chars_remaining_ = 0;
212  }
213 }
214 
215 CompressedFileReader::CompressedFileReader(const std::string& file_path,
216  const import_export::CopyParams& copy_params)
217  : CsvReader(file_path, copy_params)
218  , archive_(file_path)
219  , initial_scan_(true)
220  , scan_finished_(false)
221  , current_offset_(0)
222  , current_index_(-1) {
223  // Initialize first entry
224  nextEntry();
225 }
226 
227 CompressedFileReader::CompressedFileReader(const std::string& file_path,
228  const import_export::CopyParams& copy_params,
229  const rapidjson::Value& value)
230  : CompressedFileReader(file_path, copy_params) {
231  scan_finished_ = true;
232  initial_scan_ = false;
233  sourcenames_.clear();
234  archive_entry_index_.clear();
235  cumulative_sizes_.clear();
236  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
237  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
238  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
239 }
240 
242  size_t read_size,
243  size_t buffer_size) {
244  size_t remaining_size = read_size;
245  char* dest = static_cast<char*>(buffer);
246  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
247  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
249  : remaining_size;
250  // copy data into dest
251  archive_.consumeDataFromCurrentEntry(copy_size, dest);
252  remaining_size -= copy_size;
253  dest += copy_size;
254  }
255  size_t bytes_read = read_size - remaining_size;
256  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
257  adjust_eof(
258  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
259  current_offset_ += bytes_read;
260  nextEntry();
261  } else {
262  current_offset_ += bytes_read;
263  }
264  return bytes_read;
265 }
266 
267 size_t CompressedFileReader::read(void* buffer, size_t max_size) {
268  // Leave one extra char in case we need to insert a delimiter
269  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
270  return bytes_read;
271 }
272 
273 size_t CompressedFileReader::readRegion(void* buffer, size_t offset, size_t size) {
275 
276  // Determine where in the archive we are
277  size_t index = offset_to_index(cumulative_sizes_, offset);
278  CHECK(archive_entry_index_.size() > index);
279  auto archive_entry = archive_entry_index_[index];
280  current_index_ = static_cast<int>(index);
281 
282  // If we are in the wrong entry or too far in the right one skip to the correct entry
283  if (archive_entry != archive_.getCurrentEntryIndex() ||
284  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
285  archive_.skipToEntry(archive_entry);
286  skipHeader();
287  current_offset_ = 0;
288  if (index > 0) {
289  current_offset_ = cumulative_sizes_[index - 1];
290  }
291  }
292  skipBytes(offset - current_offset_);
293  return readInternal(buffer, size, size);
294 }
295 
300  do {
301  // Go to the next index
302  current_index_++;
303  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
305  }
306  if (!initial_scan_) {
307  // Entry # in the archive is known and might not be the next one in the file
308  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
310  skipHeader();
311  } else {
312  scan_finished_ = true;
313  return;
314  }
315  } else {
316  // Read next header in archive and save the sourcename
317  if (archive_.nextEntry()) {
318  // read headers until one has data
319  CHECK(sourcenames_.size() == archive_entry_index_.size());
320  sourcenames_.emplace_back(archive_.entryName());
322  skipHeader();
323  } else {
324  scan_finished_ = true;
325  initial_scan_ = false;
326  return;
327  }
328  }
329  } while (archive_.currentEntryFinished());
330 }
331 
337  while (!archive_.currentEntryFinished()) {
340  break;
341  }
343  }
344  }
345 }
346 
351 void CompressedFileReader::skipBytes(size_t n_bytes) {
352  current_offset_ += n_bytes;
353  while (n_bytes > 0) {
355  // We've reached the end of the entry
356  return;
357  }
358  // Keep fetching blocks/entries until we've gone through N bytes
359  if (archive_.currentEntryDataAvailable() <= n_bytes) {
360  n_bytes -= archive_.currentEntryDataAvailable();
362  } else {
364  n_bytes = 0;
365  }
366  }
367 }
368 
370  const ForeignServer* server_options,
371  const UserMapping* user_mapping) {
372  CHECK(initial_scan_ == false);
373  size_t initial_entries = archive_entry_index_.size();
374 
375  // Reset all entry indexes for existing items
376  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
377  archive_entry_index_[index] = -1;
378  }
379 
380  // Read headers and determine location of existing and new files
381  int entry_number = 0;
383  while (archive_.nextEntry()) {
384  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
385  if (it != sourcenames_.end()) {
386  // Record new index of already read file
387  auto index = it - sourcenames_.begin();
388  archive_entry_index_[index] = entry_number;
389  } else {
390  // Append new source file
391  sourcenames_.emplace_back(archive_.entryName());
392  archive_entry_index_.emplace_back(entry_number);
393  }
394  entry_number++;
395  }
396 
397  // Error if we are missing a file from a previous scan
398  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
399  if (archive_entry_index_[index] == -1) {
400  throw std::runtime_error{
401  "Foreign table refreshed with APPEND mode missing archive entry \"" +
402  sourcenames_[index] + "\" from file \"" +
403  boost::filesystem::path(file_path_).filename().string() + "\"."};
404  }
405  }
406 
408  if (initial_entries < archive_entry_index_.size()) {
409  // We found more files
410  current_index_ = static_cast<int>(initial_entries) - 1;
412  // iterate through new entries until we get one with data
413  do {
414  nextEntry();
415  } while (archive_.currentEntryFinished() &&
416  current_index_ < static_cast<int>(archive_entry_index_.size()));
417 
419  scan_finished_ = false;
420  }
421  } else {
422  // No new files but this may be an archive of a single file
423  // Check if we only have one CSV file and check if it has more data
424  // May have still have multiple entries with some empty that are ignored
425  // like directories
426  size_t last_size = 0;
427  size_t csv_index = -1;
428  size_t num_csv_entries = 0;
429  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
430  if (cumulative_sizes_[index] > last_size) {
431  csv_index = index;
432  num_csv_entries++;
433  last_size = cumulative_sizes_[index];
434  }
435  }
436  if (num_csv_entries == 1) {
437  current_index_ = static_cast<int>(csv_index);
438  current_offset_ = 0;
439  size_t last_eof = cumulative_sizes_[csv_index];
440 
441  // reset cumulative_sizes_ with initial zero sizes
442  auto old_cumulative_sizes = std::move(cumulative_sizes_);
443  cumulative_sizes_ = {};
444  for (size_t zero_index = 0; zero_index < csv_index; zero_index++) {
445  cumulative_sizes_.emplace_back(0);
446  }
447 
448  // Go to Index of CSV file and read to where we left off
450  skipHeader();
451  skipBytes(last_eof);
453  scan_finished_ = false;
454  } else {
455  // There was no new data, so put back the old data structure
456  cumulative_sizes_ = std::move(old_cumulative_sizes);
457  }
458  }
459  }
460 };
461 
463  rapidjson::Value& value,
464  rapidjson::Document::AllocatorType& allocator) const {
465  // Should be done initial scan
468 
469  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
471  value, cumulative_sizes_, "cumulative_sizes", allocator);
473  value, archive_entry_index_, "archive_entry_index", allocator);
474 };
475 
476 MultiFileReader::MultiFileReader(const std::string& file_path,
477  const import_export::CopyParams& copy_params)
478  : CsvReader(file_path, copy_params), current_index_(0), current_offset_(0) {}
479 
480 MultiFileReader::MultiFileReader(const std::string& file_path,
481  const import_export::CopyParams& copy_params,
482  const rapidjson::Value& value)
483  : CsvReader(file_path, copy_params), current_index_(0), current_offset_(0) {
484  json_utils::get_value_from_object(value, file_locations_, "file_locations");
485  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
486  json_utils::get_value_from_object(value, current_offset_, "current_offset");
487  json_utils::get_value_from_object(value, current_index_, "current_index");
488 
489  // Validate files_metadata here, but objects will be recreated by child class
490  CHECK(value.HasMember("files_metadata"));
491  CHECK(value["files_metadata"].IsArray());
492  CHECK(file_locations_.size() == value["files_metadata"].GetArray().Size());
493 }
494 
495 void MultiFileReader::serialize(rapidjson::Value& value,
496  rapidjson::Document::AllocatorType& allocator) const {
497  json_utils::add_value_to_object(value, file_locations_, "file_locations", allocator);
499  value, cumulative_sizes_, "cumulative_sizes", allocator);
500  json_utils::add_value_to_object(value, current_offset_, "current_offset", allocator);
501  json_utils::add_value_to_object(value, current_index_, "current_index", allocator);
502 
503  // Serialize metadata from all files
504  rapidjson::Value files_metadata(rapidjson::kArrayType);
505  for (size_t index = 0; index < files_.size(); index++) {
506  rapidjson::Value file_metadata(rapidjson::kObjectType);
507  files_[index]->serialize(file_metadata, allocator);
508  files_metadata.PushBack(file_metadata, allocator);
509  }
510  value.AddMember("files_metadata", files_metadata, allocator);
511 };
512 
514  size_t total_size = 0;
515  for (size_t index = current_index_; index < files_.size(); index++) {
516  total_size += files_[index]->getRemainingSize();
517  }
518  return total_size;
519 }
520 
522  bool size_known = true;
523  for (size_t index = current_index_; index < files_.size(); index++) {
524  size_known = size_known && files_[index]->isRemainingSizeKnown();
525  }
526  return size_known;
527 };
528 
529 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
530  const import_export::CopyParams& copy_params)
531  : MultiFileReader(file_path, copy_params) {
532  std::set<std::string> file_locations;
533  if (boost::filesystem::is_directory(file_path)) {
534  // Find all files in this directory
535  for (boost::filesystem::recursive_directory_iterator it(file_path), eit; it != eit;
536  ++it) {
537  if (!boost::filesystem::is_directory(it->path())) {
538  file_locations.insert(it->path().string());
539  }
540  }
541  } else {
542  file_locations.insert(file_path);
543  }
544  for (const auto& location : file_locations) {
545  insertFile(location);
546  }
547 }
548 
549 namespace {
550 bool is_compressed_file(const std::string& location) {
551  const std::vector<std::string> compressed_exts = {
552  ".zip", ".gz", ".tar", ".rar", ".bz2", ".7z", ".tgz"};
553  const std::vector<std::string> uncompressed_exts = {"", ".csv", ".tsv", ".txt"};
554  if (std::find(compressed_exts.begin(),
555  compressed_exts.end(),
556  boost::filesystem::extension(location)) != compressed_exts.end()) {
557  return true;
558  } else if (std::find(uncompressed_exts.begin(),
559  uncompressed_exts.end(),
560  boost::filesystem::extension(location)) !=
561  uncompressed_exts.end()) {
562  return false;
563  } else {
564  throw std::runtime_error{"Invalid extention for file \"" + location + "\"."};
565  }
566 }
567 } // namespace
568 
569 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
570  const import_export::CopyParams& copy_params,
571  const rapidjson::Value& value)
572  : MultiFileReader(file_path, copy_params, value) {
573  // Constructs file from files_metadata
574  for (size_t index = 0; index < file_locations_.size(); index++) {
575  if (is_compressed_file(file_locations_[index])) {
576  files_.emplace_back(std::make_unique<CompressedFileReader>(
577  file_locations_[index],
578  copy_params_,
579  value["files_metadata"].GetArray()[index]));
580  } else {
581  files_.emplace_back(
582  std::make_unique<SingleFileReader>(file_locations_[index],
583  copy_params_,
584  value["files_metadata"].GetArray()[index]));
585  }
586  }
587 }
588 
589 void LocalMultiFileReader::insertFile(std::string location) {
590  if (is_compressed_file(location)) {
591  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
592  } else {
593  files_.emplace_back(std::make_unique<SingleFileReader>(location, copy_params_));
594  }
595  if (files_.back()->isScanFinished()) {
596  // skip any initially empty files
597  files_.pop_back();
598  } else {
599  file_locations_.push_back(location);
600  }
601 }
602 
604  const ForeignServer* server_options,
605  const UserMapping* user_mapping) {
606  // Look for new files
607  std::set<std::string> new_locations;
609  CHECK(file_offset == current_offset_);
610  if (boost::filesystem::is_directory(file_path_)) {
611  // Find all files in this directory
612  std::set<std::string> all_file_paths;
613  for (boost::filesystem::recursive_directory_iterator it(file_path_), eit; it != eit;
614  ++it) {
615  bool new_file =
616  std::find(file_locations_.begin(), file_locations_.end(), it->path()) ==
617  file_locations_.end();
618  if (!boost::filesystem::is_directory(it->path()) && new_file) {
619  new_locations.insert(it->path().string());
620  }
621  all_file_paths.emplace(it->path().string());
622  }
623 
624  for (const auto& file_path : file_locations_) {
625  if (all_file_paths.find(file_path) == all_file_paths.end()) {
626  throw_removed_file_error(file_path);
627  }
628  }
629  }
630  if (new_locations.size() > 0) {
631  for (const auto& location : new_locations) {
632  insertFile(location);
633  }
634  } else if (files_.size() == 1) {
635  // Single file, check if it has new data
636  files_[0].get()->checkForMoreRows(file_offset);
637  if (!files_[0].get()->isScanFinished()) {
638  current_index_ = 0;
639  cumulative_sizes_ = {};
640  }
641  }
642 }
643 
644 size_t MultiFileReader::read(void* buffer, size_t max_size) {
645  if (isScanFinished()) {
646  return 0;
647  }
648  // Leave one extra char in case we need to insert a delimiter
649  size_t bytes_read = files_[current_index_].get()->read(buffer, max_size - 1);
650  if (files_[current_index_].get()->isScanFinished()) {
651  adjust_eof(bytes_read, max_size, static_cast<char*>(buffer), copy_params_.line_delim);
652  }
653  current_offset_ += bytes_read;
654  if (current_index_ < files_.size() && files_[current_index_].get()->isScanFinished()) {
656  current_index_++;
657  }
658  return bytes_read;
659 }
660 
661 size_t MultiFileReader::readRegion(void* buffer, size_t offset, size_t size) {
663  // Get file index
664  auto index = offset_to_index(cumulative_sizes_, offset);
665  // Get offset into this file
666  size_t base = 0;
667  if (index > 0) {
668  base = cumulative_sizes_[index - 1];
669  }
670 
671  size_t read_size = size;
672  if (offset + size == cumulative_sizes_[index]) {
673  // Skip the last byte as it may have been an inserted delimiter
674  read_size--;
675  }
676  size_t bytes_read = files_[index].get()->readRegion(buffer, offset - base, read_size);
677 
678  if (offset + size == cumulative_sizes_[index]) {
679  // Re-insert delimiter
680  static_cast<char*>(buffer)[size - 1] = copy_params_.line_delim;
681  bytes_read++;
682  }
683 
684  return bytes_read;
685 }
686 
687 } // namespace foreign_storage
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
size_t get_data_size(size_t file_size, size_t header_size)
Definition: CsvReader.cpp:62
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:215
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: CsvReader.cpp:273
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: CsvReader.cpp:241
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:126
std::string to_string(char const *&&v)
void throw_removed_row_error(const std::string &file_path)
void insertFile(std::string location)
Definition: CsvReader.cpp:589
void throw_removed_file_error(const std::string &file_path)
ImportHeaderRow has_header
Definition: CopyParams.h:48
bool is_compressed_file(const std::string &location)
Definition: CsvReader.cpp:550
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:462
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:529
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: CsvReader.cpp:182
bool isRemainingSizeKnown() override
Definition: CsvReader.cpp:521
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: CsvReader.cpp:33
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: CsvReader.cpp:53
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: CsvReader.cpp:661
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:117
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
size_t currentEntryDataAvailable() const
Definition: CsvReader.h:185
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:69
size_t read(void *buffer, size_t max_size) override
Definition: CsvReader.cpp:644
void skipToEntry(int entry_number)
Definition: CsvReader.cpp:158
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:369
#define CHECK(condition)
Definition: Logger.h:197
bool isScanFinished() override
Definition: CsvReader.h:307
size_t getRemainingSize() override
Definition: CsvReader.cpp:513
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:495
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:317
size_t read(void *buffer, size_t max_size) override
Definition: CsvReader.cpp:267
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:603