OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CompressedFileReader Class Reference

#include <FileReader.h>

+ Inheritance diagram for foreign_storage::CompressedFileReader:
+ Collaboration diagram for foreign_storage::CompressedFileReader:

Public Member Functions

 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () const override
 
bool isRemainingSizeKnown () override
 
size_t getRemainingSize () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::SingleFileReader
 SingleFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 ~SingleFileReader () override=default
 
FirstLineByFilePath getFirstLineForEachFile () const override
 
bool isEndOfLastFile () override
 
std::string getCurrentFilePath () const override
 
- Public Member Functions inherited from foreign_storage::FileReader
 FileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~FileReader ()=default
 

Private Member Functions

void resetArchive ()
 
void checkForMoreRows (size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
void nextEntry ()
 
void skipHeader () override
 
void skipBytes (size_t n_bytes)
 
size_t readInternal (void *buffer, size_t read_size, size_t buffer_size)
 
std::string getFirstLine () const override
 
void consumeFirstLine (std::optional< std::string > &dest_str)
 

Private Attributes

ArchiveWrapper archive_
 
bool initial_scan_
 
bool scan_finished_
 
size_t current_offset_
 
int current_index_
 
std::vector< size_t > cumulative_sizes_
 
std::vector< std::string > sourcenames_
 
std::vector< int > archive_entry_index_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::FileReader
import_export::CopyParams copy_params_
 
std::string file_path_
 
- Static Protected Attributes inherited from foreign_storage::SingleFileReader
static constexpr size_t DEFAULT_HEADER_READ_SIZE {1024}
 

Detailed Description

Definition at line 269 of file FileReader.h.

Constructor & Destructor Documentation

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Definition at line 249 of file FileReader.cpp.

References nextEntry().

251  : SingleFileReader(file_path, copy_params)
252  , archive_(file_path)
253  , initial_scan_(true)
254  , scan_finished_(false)
255  , current_offset_(0)
256  , current_index_(-1) {
257  // Initialize first entry
258  nextEntry();
259 }
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:73

+ Here is the call graph for this function:

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 261 of file FileReader.cpp.

References archive_entry_index_, cumulative_sizes_, json_utils::get_value_from_object(), initial_scan_, scan_finished_, and sourcenames_.

264  : CompressedFileReader(file_path, copy_params) {
265  scan_finished_ = true;
266  initial_scan_ = false;
267  sourcenames_.clear();
268  archive_entry_index_.clear();
269  cumulative_sizes_.clear();
270  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
271  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
272  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
273 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:249
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::CompressedFileReader::checkForMoreRows ( size_t  file_offset,
const shared::FilePathOptions options,
const ForeignServer server_options,
const UserMapping user_mapping 
)
overrideprivatevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed files
user_mapping- only needed for S3 backed files

Reimplemented from foreign_storage::FileReader.

Definition at line 422 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::FileReader::file_path_, initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), nextEntry(), foreign_storage::ArchiveWrapper::resetArchive(), scan_finished_, skipBytes(), skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

425  {
426  CHECK(initial_scan_ == false);
427  size_t initial_entries = archive_entry_index_.size();
428 
429  // Reset all entry indexes for existing items
430  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
431  archive_entry_index_[index] = -1;
432  }
433 
434  // Read headers and determine location of existing and new files
435  int entry_number = 0;
437  while (archive_.nextEntry()) {
438  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
439  if (it != sourcenames_.end()) {
440  // Record new index of already read file
441  auto index = it - sourcenames_.begin();
442  archive_entry_index_[index] = entry_number;
443  } else {
444  // Append new source file
445  sourcenames_.emplace_back(archive_.entryName());
446  archive_entry_index_.emplace_back(entry_number);
447  }
448  entry_number++;
449  }
450 
451  // Error if we are missing a file from a previous scan
452  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
453  if (archive_entry_index_[index] == -1) {
454  throw std::runtime_error{
455  "Foreign table refreshed with APPEND mode missing archive entry \"" +
456  sourcenames_[index] + "\" from file \"" +
457  boost::filesystem::path(file_path_).filename().string() + "\"."};
458  }
459  }
460 
462  if (initial_entries < archive_entry_index_.size()) {
463  // We found more files
464  current_index_ = static_cast<int>(initial_entries) - 1;
466  // iterate through new entries until we get one with data
467  do {
468  nextEntry();
469  } while (archive_.currentEntryFinished() &&
470  current_index_ < static_cast<int>(archive_entry_index_.size()));
471 
473  scan_finished_ = false;
474  }
475  } else {
476  // No new files but this may be an archive of a single file
477  // Check if we only have one file and check if it has more data
478  // May have still have multiple entries with some empty that are ignored
479  // like directories
480  size_t last_size = 0;
481  size_t file_index = -1;
482  size_t num_file_entries = 0;
483  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
484  if (cumulative_sizes_[index] > last_size) {
485  file_index = index;
486  num_file_entries++;
487  last_size = cumulative_sizes_[index];
488  }
489  }
490  if (num_file_entries == 1) {
491  current_index_ = static_cast<int>(file_index);
492  current_offset_ = 0;
493  size_t last_eof = cumulative_sizes_[file_index];
494 
495  // reset cumulative_sizes_ with initial zero sizes
496  auto old_cumulative_sizes = std::move(cumulative_sizes_);
497  cumulative_sizes_ = {};
498  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
499  cumulative_sizes_.emplace_back(0);
500  }
501 
502  // Go to Index of file and read to where we left off
504  skipHeader();
505  skipBytes(last_eof);
507  scan_finished_ = false;
508  } else {
509  // There was no new data, so put back the old data structure
510  cumulative_sizes_ = std::move(old_cumulative_sizes);
511  }
512  }
513  }
514 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
bool currentEntryFinished() const
Definition: FileReader.h:234
void skipToEntry(int entry_number)
Definition: FileReader.cpp:192
#define CHECK(condition)
Definition: Logger.h:291
size_t currentEntryDataAvailable() const
Definition: FileReader.h:236

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::consumeFirstLine ( std::optional< std::string > &  dest_str)
private

Definition at line 384 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, foreign_storage::ArchiveWrapper::currentEntryFinished(), import_export::CopyParams::line_delim, and foreign_storage::ArchiveWrapper::peekNextChar().

Referenced by skipHeader().

384  {
385  char* dest_buffer = nullptr;
386  while (!archive_.currentEntryFinished()) {
387  if (dest_str.has_value()) {
388  auto& str = dest_str.value();
389  str.resize(str.length() + 1);
390  dest_buffer = str.data() + str.length() - 1;
391  }
393  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
394  break;
395  }
396  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
397  }
398 }
import_export::CopyParams copy_params_
Definition: FileReader.h:128
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:216
bool currentEntryFinished() const
Definition: FileReader.h:234

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::CompressedFileReader::getFirstLine ( ) const
overrideprivatevirtual

Implements foreign_storage::SingleFileReader.

Definition at line 376 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::SingleFileReader::DEFAULT_HEADER_READ_SIZE, and foreign_storage::FileReader::file_path_.

376  {
378  auto first_line = std::make_optional<std::string>();
379  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
380  reader.consumeFirstLine(first_line);
381  return first_line.value();
382 }
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:249
import_export::CopyParams copy_params_
Definition: FileReader.h:128
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:149
size_t foreign_storage::CompressedFileReader::getRemainingSize ( )
inlineoverridevirtual
Returns
size of the remaining content to be read

Implements foreign_storage::FileReader.

Definition at line 283 of file FileReader.h.

283 { return 0; }
bool foreign_storage::CompressedFileReader::isRemainingSizeKnown ( )
inlineoverridevirtual
Returns
if remaining size is known

Implements foreign_storage::FileReader.

Definition at line 282 of file FileReader.h.

282 { return false; };
bool foreign_storage::CompressedFileReader::isScanFinished ( ) const
inlineoverridevirtual
Returns
true if the entire file has been read

Implements foreign_storage::FileReader.

Definition at line 280 of file FileReader.h.

References scan_finished_.

Referenced by readRegion().

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::nextEntry ( )
private

Go to next archive entry/header with valid data

Definition at line 333 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), scan_finished_, skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

Referenced by checkForMoreRows(), CompressedFileReader(), and readInternal().

333  {
334  do {
335  // Go to the next index
336  current_index_++;
337  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
339  }
340  if (!initial_scan_) {
341  // Entry # in the archive is known and might not be the next one in the file
342  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
344  skipHeader();
345  } else {
346  scan_finished_ = true;
347  return;
348  }
349  } else {
350  // Read next header in archive and save the sourcename
351  if (archive_.nextEntry()) {
352  // read headers until one has data
353  CHECK(sourcenames_.size() == archive_entry_index_.size());
354  sourcenames_.emplace_back(archive_.entryName());
356  skipHeader();
357  } else {
358  scan_finished_ = true;
359  initial_scan_ = false;
360  return;
361  }
362  }
363  } while (archive_.currentEntryFinished());
364 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
bool currentEntryFinished() const
Definition: FileReader.h:234
void skipToEntry(int entry_number)
Definition: FileReader.cpp:192
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::read ( void *  buffer,
size_t  max_size 
)
overridevirtual

Read up to max_size bytes from archive into buffer starting starting from the end of the last read

Parameters
buffer- buffer to load into
max_size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 301 of file FileReader.cpp.

References readInternal().

301  {
302  // Leave one extra char in case we need to insert a delimiter
303  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
304  return bytes_read;
305 }
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:275

+ Here is the call graph for this function:

size_t foreign_storage::CompressedFileReader::readInternal ( void *  buffer,
size_t  read_size,
size_t  buffer_size 
)
private

Definition at line 275 of file FileReader.cpp.

References foreign_storage::anonymous_namespace{FileReader.cpp}::adjust_eof(), archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), run_benchmark_import::dest, import_export::CopyParams::line_delim, and nextEntry().

Referenced by read(), and readRegion().

277  {
278  size_t remaining_size = read_size;
279  char* dest = static_cast<char*>(buffer);
280  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
281  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
283  : remaining_size;
284  // copy data into dest
285  archive_.consumeDataFromCurrentEntry(copy_size, dest);
286  remaining_size -= copy_size;
287  dest += copy_size;
288  }
289  size_t bytes_read = read_size - remaining_size;
290  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
291  adjust_eof(
292  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
293  current_offset_ += bytes_read;
294  nextEntry();
295  } else {
296  current_offset_ += bytes_read;
297  }
298  return bytes_read;
299 }
import_export::CopyParams copy_params_
Definition: FileReader.h:128
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:216
bool currentEntryFinished() const
Definition: FileReader.h:234
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:37
size_t currentEntryDataAvailable() const
Definition: FileReader.h:236

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::readRegion ( void *  buffer,
size_t  offset,
size_t  size 
)
overridevirtual

Read up to max_size bytes from archive, starting at given offset isScanFinished() must return true to use readRegion

Parameters
buffer- buffer to load into
offset- starting point into the archive to read
size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 307 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), isScanFinished(), foreign_storage::anonymous_namespace{FileReader.cpp}::offset_to_index(), readInternal(), skipBytes(), skipHeader(), and foreign_storage::ArchiveWrapper::skipToEntry().

307  {
309 
310  // Determine where in the archive we are
311  size_t index = offset_to_index(cumulative_sizes_, offset);
312  CHECK(archive_entry_index_.size() > index);
313  auto archive_entry = archive_entry_index_[index];
314  current_index_ = static_cast<int>(index);
315 
316  // If we are in the wrong entry or too far in the right one skip to the correct entry
317  if (archive_entry != archive_.getCurrentEntryIndex() ||
318  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
319  archive_.skipToEntry(archive_entry);
320  skipHeader();
321  current_offset_ = 0;
322  if (index > 0) {
323  current_offset_ = cumulative_sizes_[index - 1];
324  }
325  }
326  skipBytes(offset - current_offset_);
327  return readInternal(buffer, size, size);
328 }
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:57
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:275
bool isScanFinished() const override
Definition: FileReader.h:280
void skipToEntry(int entry_number)
Definition: FileReader.cpp:192
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::resetArchive ( )
private

Reopen file and reset back to the beginning

void foreign_storage::CompressedFileReader::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Implements foreign_storage::FileReader.

Definition at line 516 of file FileReader.cpp.

References json_utils::add_value_to_object(), archive_entry_index_, CHECK, cumulative_sizes_, initial_scan_, scan_finished_, and sourcenames_.

518  {
519  // Should be done initial scan
522 
523  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
525  value, cumulative_sizes_, "cumulative_sizes", allocator);
527  value, archive_entry_index_, "archive_entry_index", allocator);
528 };
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::skipBytes ( size_t  n_bytes)
private

Skip forward N bytes in current entry without reading the data

Parameters
n_bytes- number of bytes to skip

Skip forward N bytes without reading the data in current entry

Parameters
n_bytes- number of bytes to skip

Definition at line 404 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), current_offset_, and foreign_storage::ArchiveWrapper::currentEntryDataAvailable().

Referenced by checkForMoreRows(), and readRegion().

404  {
405  current_offset_ += n_bytes;
406  while (n_bytes > 0) {
408  // We've reached the end of the entry
409  return;
410  }
411  // Keep fetching blocks/entries until we've gone through N bytes
412  if (archive_.currentEntryDataAvailable() <= n_bytes) {
413  n_bytes -= archive_.currentEntryDataAvailable();
415  } else {
417  n_bytes = 0;
418  }
419  }
420 }
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:216
size_t currentEntryDataAvailable() const
Definition: FileReader.h:236

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::skipHeader ( )
overrideprivatevirtual

Skip Header of file

Skip file header

Implements foreign_storage::SingleFileReader.

Definition at line 369 of file FileReader.cpp.

References consumeFirstLine(), foreign_storage::FileReader::copy_params_, import_export::CopyParams::has_header, and import_export::kNoHeader.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

369  {
371  std::optional<std::string> str = std::nullopt;
372  consumeFirstLine(str);
373  }
374 }
import_export::CopyParams copy_params_
Definition: FileReader.h:128
ImportHeaderRow has_header
Definition: CopyParams.h:46
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:384

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

ArchiveWrapper foreign_storage::CompressedFileReader::archive_
private
std::vector<int> foreign_storage::CompressedFileReader::archive_entry_index_
private
std::vector<size_t> foreign_storage::CompressedFileReader::cumulative_sizes_
private
int foreign_storage::CompressedFileReader::current_index_
private

Definition at line 334 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

size_t foreign_storage::CompressedFileReader::current_offset_
private

Definition at line 330 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), readInternal(), readRegion(), and skipBytes().

bool foreign_storage::CompressedFileReader::initial_scan_
private

Definition at line 325 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().

bool foreign_storage::CompressedFileReader::scan_finished_
private
std::vector<std::string> foreign_storage::CompressedFileReader::sourcenames_
private

Definition at line 339 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().


The documentation for this class was generated from the following files: