OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CompressedFileReader Class Reference

#include <FileReader.h>

+ Inheritance diagram for foreign_storage::CompressedFileReader:
+ Collaboration diagram for foreign_storage::CompressedFileReader:

Public Member Functions

 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
bool isRemainingSizeKnown () override
 
size_t getRemainingSize () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::SingleFileReader
 SingleFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 ~SingleFileReader () override=default
 
FirstLineByFilePath getFirstLineForEachFile () const override
 
bool isEndOfLastFile () override
 
- Public Member Functions inherited from foreign_storage::FileReader
 FileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~FileReader ()=default
 

Private Member Functions

void resetArchive ()
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
void nextEntry ()
 
void skipHeader () override
 
void skipBytes (size_t n_bytes)
 
size_t readInternal (void *buffer, size_t read_size, size_t buffer_size)
 
std::string getFirstLine () const override
 
void consumeFirstLine (std::optional< std::string > &dest_str)
 

Private Attributes

ArchiveWrapper archive_
 
bool initial_scan_
 
bool scan_finished_
 
size_t current_offset_
 
int current_index_
 
std::vector< size_t > cumulative_sizes_
 
std::vector< std::string > sourcenames_
 
std::vector< int > archive_entry_index_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::FileReader
import_export::CopyParams copy_params_
 
std::string file_path_
 
- Static Protected Attributes inherited from foreign_storage::SingleFileReader
static constexpr size_t DEFAULT_HEADER_READ_SIZE {1024}
 

Detailed Description

Definition at line 255 of file FileReader.h.

Constructor & Destructor Documentation

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Definition at line 244 of file FileReader.cpp.

References nextEntry().

246  : SingleFileReader(file_path, copy_params)
247  , archive_(file_path)
248  , initial_scan_(true)
249  , scan_finished_(false)
250  , current_offset_(0)
251  , current_index_(-1) {
252  // Initialize first entry
253  nextEntry();
254 }
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:73

+ Here is the call graph for this function:

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 256 of file FileReader.cpp.

References archive_entry_index_, cumulative_sizes_, foreign_storage::json_utils::get_value_from_object(), initial_scan_, scan_finished_, and sourcenames_.

259  : CompressedFileReader(file_path, copy_params) {
260  scan_finished_ = true;
261  initial_scan_ = false;
262  sourcenames_.clear();
263  archive_entry_index_.clear();
264  cumulative_sizes_.clear();
265  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
266  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
267  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
268 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:244
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::CompressedFileReader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping user_mapping 
)
overrideprivatevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed files
user_mapping- only needed for S3 backed files

Reimplemented from foreign_storage::FileReader.

Definition at line 417 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::FileReader::file_path_, initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), nextEntry(), foreign_storage::ArchiveWrapper::resetArchive(), scan_finished_, skipBytes(), skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

419  {
420  CHECK(initial_scan_ == false);
421  size_t initial_entries = archive_entry_index_.size();
422 
423  // Reset all entry indexes for existing items
424  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
425  archive_entry_index_[index] = -1;
426  }
427 
428  // Read headers and determine location of existing and new files
429  int entry_number = 0;
431  while (archive_.nextEntry()) {
432  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
433  if (it != sourcenames_.end()) {
434  // Record new index of already read file
435  auto index = it - sourcenames_.begin();
436  archive_entry_index_[index] = entry_number;
437  } else {
438  // Append new source file
439  sourcenames_.emplace_back(archive_.entryName());
440  archive_entry_index_.emplace_back(entry_number);
441  }
442  entry_number++;
443  }
444 
445  // Error if we are missing a file from a previous scan
446  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
447  if (archive_entry_index_[index] == -1) {
448  throw std::runtime_error{
449  "Foreign table refreshed with APPEND mode missing archive entry \"" +
450  sourcenames_[index] + "\" from file \"" +
451  boost::filesystem::path(file_path_).filename().string() + "\"."};
452  }
453  }
454 
456  if (initial_entries < archive_entry_index_.size()) {
457  // We found more files
458  current_index_ = static_cast<int>(initial_entries) - 1;
460  // iterate through new entries until we get one with data
461  do {
462  nextEntry();
463  } while (archive_.currentEntryFinished() &&
464  current_index_ < static_cast<int>(archive_entry_index_.size()));
465 
467  scan_finished_ = false;
468  }
469  } else {
470  // No new files but this may be an archive of a single file
471  // Check if we only have one file and check if it has more data
472  // May have still have multiple entries with some empty that are ignored
473  // like directories
474  size_t last_size = 0;
475  size_t file_index = -1;
476  size_t num_file_entries = 0;
477  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
478  if (cumulative_sizes_[index] > last_size) {
479  file_index = index;
480  num_file_entries++;
481  last_size = cumulative_sizes_[index];
482  }
483  }
484  if (num_file_entries == 1) {
485  current_index_ = static_cast<int>(file_index);
486  current_offset_ = 0;
487  size_t last_eof = cumulative_sizes_[file_index];
488 
489  // reset cumulative_sizes_ with initial zero sizes
490  auto old_cumulative_sizes = std::move(cumulative_sizes_);
491  cumulative_sizes_ = {};
492  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
493  cumulative_sizes_.emplace_back(0);
494  }
495 
496  // Go to Index of file and read to where we left off
498  skipHeader();
499  skipBytes(last_eof);
501  scan_finished_ = false;
502  } else {
503  // There was no new data, so put back the old data structure
504  cumulative_sizes_ = std::move(old_cumulative_sizes);
505  }
506  }
507  }
508 };
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
bool currentEntryFinished() const
Definition: FileReader.h:220
void skipToEntry(int entry_number)
Definition: FileReader.cpp:187
#define CHECK(condition)
Definition: Logger.h:223
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::consumeFirstLine ( std::optional< std::string > &  dest_str)
private

Definition at line 379 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, foreign_storage::ArchiveWrapper::currentEntryFinished(), import_export::CopyParams::line_delim, and foreign_storage::ArchiveWrapper::peekNextChar().

Referenced by skipHeader().

379  {
380  char* dest_buffer = nullptr;
381  while (!archive_.currentEntryFinished()) {
382  if (dest_str.has_value()) {
383  auto& str = dest_str.value();
384  str.resize(str.length() + 1);
385  dest_buffer = str.data() + str.length() - 1;
386  }
388  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
389  break;
390  }
391  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
392  }
393 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:211
bool currentEntryFinished() const
Definition: FileReader.h:220

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::CompressedFileReader::getFirstLine ( ) const
overrideprivatevirtual

Implements foreign_storage::SingleFileReader.

Definition at line 371 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::SingleFileReader::DEFAULT_HEADER_READ_SIZE, and foreign_storage::FileReader::file_path_.

371  {
373  auto first_line = std::make_optional<std::string>();
374  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
375  reader.consumeFirstLine(first_line);
376  return first_line.value();
377 }
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:244
import_export::CopyParams copy_params_
Definition: FileReader.h:118
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:137
size_t foreign_storage::CompressedFileReader::getRemainingSize ( )
inlineoverridevirtual
Returns
size of the remaining content to be read

Implements foreign_storage::FileReader.

Definition at line 269 of file FileReader.h.

269 { return 0; }
bool foreign_storage::CompressedFileReader::isRemainingSizeKnown ( )
inlineoverridevirtual
Returns
if remaining size is known

Implements foreign_storage::FileReader.

Definition at line 268 of file FileReader.h.

268 { return false; };
bool foreign_storage::CompressedFileReader::isScanFinished ( )
inlineoverridevirtual
Returns
true if the entire file has been read

Implements foreign_storage::FileReader.

Definition at line 266 of file FileReader.h.

References scan_finished_.

Referenced by readRegion().

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::nextEntry ( )
private

Go to next archive entry/header with valid data

Definition at line 328 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), scan_finished_, skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

Referenced by checkForMoreRows(), CompressedFileReader(), and readInternal().

328  {
329  do {
330  // Go to the next index
331  current_index_++;
332  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
334  }
335  if (!initial_scan_) {
336  // Entry # in the archive is known and might not be the next one in the file
337  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
339  skipHeader();
340  } else {
341  scan_finished_ = true;
342  return;
343  }
344  } else {
345  // Read next header in archive and save the sourcename
346  if (archive_.nextEntry()) {
347  // read headers until one has data
348  CHECK(sourcenames_.size() == archive_entry_index_.size());
349  sourcenames_.emplace_back(archive_.entryName());
351  skipHeader();
352  } else {
353  scan_finished_ = true;
354  initial_scan_ = false;
355  return;
356  }
357  }
358  } while (archive_.currentEntryFinished());
359 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
bool currentEntryFinished() const
Definition: FileReader.h:220
void skipToEntry(int entry_number)
Definition: FileReader.cpp:187
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::read ( void *  buffer,
size_t  max_size 
)
overridevirtual

Read up to max_size bytes from archive into buffer starting starting from the end of the last read

Parameters
buffer- buffer to load into
max_size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 296 of file FileReader.cpp.

References readInternal().

296  {
297  // Leave one extra char in case we need to insert a delimiter
298  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
299  return bytes_read;
300 }
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:270

+ Here is the call graph for this function:

size_t foreign_storage::CompressedFileReader::readInternal ( void *  buffer,
size_t  read_size,
size_t  buffer_size 
)
private

Definition at line 270 of file FileReader.cpp.

References foreign_storage::anonymous_namespace{FileReader.cpp}::adjust_eof(), archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), run_benchmark_import::dest, import_export::CopyParams::line_delim, and nextEntry().

Referenced by read(), and readRegion().

272  {
273  size_t remaining_size = read_size;
274  char* dest = static_cast<char*>(buffer);
275  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
276  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
278  : remaining_size;
279  // copy data into dest
280  archive_.consumeDataFromCurrentEntry(copy_size, dest);
281  remaining_size -= copy_size;
282  dest += copy_size;
283  }
284  size_t bytes_read = read_size - remaining_size;
285  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
286  adjust_eof(
287  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
288  current_offset_ += bytes_read;
289  nextEntry();
290  } else {
291  current_offset_ += bytes_read;
292  }
293  return bytes_read;
294 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:211
bool currentEntryFinished() const
Definition: FileReader.h:220
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:37
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::readRegion ( void *  buffer,
size_t  offset,
size_t  size 
)
overridevirtual

Read up to max_size bytes from archive, starting at given offset isScanFinished() must return true to use readRegion

Parameters
buffer- buffer to load into
offset- starting point into the archive to read
size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 302 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), isScanFinished(), foreign_storage::anonymous_namespace{FileReader.cpp}::offset_to_index(), readInternal(), skipBytes(), skipHeader(), and foreign_storage::ArchiveWrapper::skipToEntry().

302  {
304 
305  // Determine where in the archive we are
306  size_t index = offset_to_index(cumulative_sizes_, offset);
307  CHECK(archive_entry_index_.size() > index);
308  auto archive_entry = archive_entry_index_[index];
309  current_index_ = static_cast<int>(index);
310 
311  // If we are in the wrong entry or too far in the right one skip to the correct entry
312  if (archive_entry != archive_.getCurrentEntryIndex() ||
313  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
314  archive_.skipToEntry(archive_entry);
315  skipHeader();
316  current_offset_ = 0;
317  if (index > 0) {
318  current_offset_ = cumulative_sizes_[index - 1];
319  }
320  }
321  skipBytes(offset - current_offset_);
322  return readInternal(buffer, size, size);
323 }
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:57
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:270
void skipToEntry(int entry_number)
Definition: FileReader.cpp:187
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::resetArchive ( )
private

Reopen file and reset back to the beginning

void foreign_storage::CompressedFileReader::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Implements foreign_storage::FileReader.

Definition at line 510 of file FileReader.cpp.

References foreign_storage::json_utils::add_value_to_object(), archive_entry_index_, CHECK, cumulative_sizes_, initial_scan_, scan_finished_, and sourcenames_.

512  {
513  // Should be done initial scan
516 
517  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
519  value, cumulative_sizes_, "cumulative_sizes", allocator);
521  value, archive_entry_index_, "archive_entry_index", allocator);
522 };
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::skipBytes ( size_t  n_bytes)
private

Skip forward N bytes in current entry without reading the data

Parameters
n_bytes- number of bytes to skip

Skip forward N bytes without reading the data in current entry

Parameters
n_bytes- number of bytes to skip

Definition at line 399 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), current_offset_, and foreign_storage::ArchiveWrapper::currentEntryDataAvailable().

Referenced by checkForMoreRows(), and readRegion().

399  {
400  current_offset_ += n_bytes;
401  while (n_bytes > 0) {
403  // We've reached the end of the entry
404  return;
405  }
406  // Keep fetching blocks/entries until we've gone through N bytes
407  if (archive_.currentEntryDataAvailable() <= n_bytes) {
408  n_bytes -= archive_.currentEntryDataAvailable();
410  } else {
412  n_bytes = 0;
413  }
414  }
415 }
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:211
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::skipHeader ( )
overrideprivatevirtual

Skip Header of file

Skip file header

Implements foreign_storage::SingleFileReader.

Definition at line 364 of file FileReader.cpp.

References consumeFirstLine(), foreign_storage::FileReader::copy_params_, import_export::CopyParams::has_header, and import_export::kNoHeader.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

364  {
366  std::optional<std::string> str = std::nullopt;
367  consumeFirstLine(str);
368  }
369 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
ImportHeaderRow has_header
Definition: CopyParams.h:46
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:379

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

ArchiveWrapper foreign_storage::CompressedFileReader::archive_
private
std::vector<int> foreign_storage::CompressedFileReader::archive_entry_index_
private
std::vector<size_t> foreign_storage::CompressedFileReader::cumulative_sizes_
private
int foreign_storage::CompressedFileReader::current_index_
private

Definition at line 319 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

size_t foreign_storage::CompressedFileReader::current_offset_
private

Definition at line 315 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), readInternal(), readRegion(), and skipBytes().

bool foreign_storage::CompressedFileReader::initial_scan_
private

Definition at line 310 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().

bool foreign_storage::CompressedFileReader::scan_finished_
private
std::vector<std::string> foreign_storage::CompressedFileReader::sourcenames_
private

Definition at line 324 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().


The documentation for this class was generated from the following files: