OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CompressedFileReader Class Reference

#include <FileReader.h>

+ Inheritance diagram for foreign_storage::CompressedFileReader:
+ Collaboration diagram for foreign_storage::CompressedFileReader:

Public Member Functions

 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
bool isRemainingSizeKnown () override
 
size_t getRemainingSize () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::SingleFileReader
 SingleFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 ~SingleFileReader () override=default
 
FirstLineByFilePath getFirstLineForEachFile () const override
 
bool isEndOfLastFile () override
 
- Public Member Functions inherited from foreign_storage::FileReader
 FileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~FileReader ()=default
 

Private Member Functions

void resetArchive ()
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
void nextEntry ()
 
void skipHeader () override
 
void skipBytes (size_t n_bytes)
 
size_t readInternal (void *buffer, size_t read_size, size_t buffer_size)
 
std::string getFirstLine () const override
 
void consumeFirstLine (std::optional< std::string > &dest_str)
 

Private Attributes

ArchiveWrapper archive_
 
bool initial_scan_
 
bool scan_finished_
 
size_t current_offset_
 
int current_index_
 
std::vector< size_t > cumulative_sizes_
 
std::vector< std::string > sourcenames_
 
std::vector< int > archive_entry_index_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::FileReader
import_export::CopyParams copy_params_
 
std::string file_path_
 
- Static Protected Attributes inherited from foreign_storage::SingleFileReader
static constexpr size_t DEFAULT_HEADER_READ_SIZE {1024}
 

Detailed Description

Definition at line 255 of file FileReader.h.

Constructor & Destructor Documentation

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Definition at line 239 of file FileReader.cpp.

References nextEntry().

241  : SingleFileReader(file_path, copy_params)
242  , archive_(file_path)
243  , initial_scan_(true)
244  , scan_finished_(false)
245  , current_offset_(0)
246  , current_index_(-1) {
247  // Initialize first entry
248  nextEntry();
249 }
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:72

+ Here is the call graph for this function:

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 251 of file FileReader.cpp.

References archive_entry_index_, cumulative_sizes_, foreign_storage::json_utils::get_value_from_object(), initial_scan_, scan_finished_, and sourcenames_.

254  : CompressedFileReader(file_path, copy_params) {
255  scan_finished_ = true;
256  initial_scan_ = false;
257  sourcenames_.clear();
258  archive_entry_index_.clear();
259  cumulative_sizes_.clear();
260  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
261  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
262  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
263 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:239
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::CompressedFileReader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping user_mapping 
)
overrideprivatevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed files
user_mapping- only needed for S3 backed files

Reimplemented from foreign_storage::FileReader.

Definition at line 412 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::FileReader::file_path_, initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), nextEntry(), foreign_storage::ArchiveWrapper::resetArchive(), scan_finished_, skipBytes(), skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

414  {
415  CHECK(initial_scan_ == false);
416  size_t initial_entries = archive_entry_index_.size();
417 
418  // Reset all entry indexes for existing items
419  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
420  archive_entry_index_[index] = -1;
421  }
422 
423  // Read headers and determine location of existing and new files
424  int entry_number = 0;
426  while (archive_.nextEntry()) {
427  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
428  if (it != sourcenames_.end()) {
429  // Record new index of already read file
430  auto index = it - sourcenames_.begin();
431  archive_entry_index_[index] = entry_number;
432  } else {
433  // Append new source file
434  sourcenames_.emplace_back(archive_.entryName());
435  archive_entry_index_.emplace_back(entry_number);
436  }
437  entry_number++;
438  }
439 
440  // Error if we are missing a file from a previous scan
441  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
442  if (archive_entry_index_[index] == -1) {
443  throw std::runtime_error{
444  "Foreign table refreshed with APPEND mode missing archive entry \"" +
445  sourcenames_[index] + "\" from file \"" +
446  boost::filesystem::path(file_path_).filename().string() + "\"."};
447  }
448  }
449 
451  if (initial_entries < archive_entry_index_.size()) {
452  // We found more files
453  current_index_ = static_cast<int>(initial_entries) - 1;
455  // iterate through new entries until we get one with data
456  do {
457  nextEntry();
458  } while (archive_.currentEntryFinished() &&
459  current_index_ < static_cast<int>(archive_entry_index_.size()));
460 
462  scan_finished_ = false;
463  }
464  } else {
465  // No new files but this may be an archive of a single file
466  // Check if we only have one file and check if it has more data
467  // May have still have multiple entries with some empty that are ignored
468  // like directories
469  size_t last_size = 0;
470  size_t file_index = -1;
471  size_t num_file_entries = 0;
472  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
473  if (cumulative_sizes_[index] > last_size) {
474  file_index = index;
475  num_file_entries++;
476  last_size = cumulative_sizes_[index];
477  }
478  }
479  if (num_file_entries == 1) {
480  current_index_ = static_cast<int>(file_index);
481  current_offset_ = 0;
482  size_t last_eof = cumulative_sizes_[file_index];
483 
484  // reset cumulative_sizes_ with initial zero sizes
485  auto old_cumulative_sizes = std::move(cumulative_sizes_);
486  cumulative_sizes_ = {};
487  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
488  cumulative_sizes_.emplace_back(0);
489  }
490 
491  // Go to Index of file and read to where we left off
493  skipHeader();
494  skipBytes(last_eof);
496  scan_finished_ = false;
497  } else {
498  // There was no new data, so put back the old data structure
499  cumulative_sizes_ = std::move(old_cumulative_sizes);
500  }
501  }
502  }
503 };
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
bool currentEntryFinished() const
Definition: FileReader.h:220
void skipToEntry(int entry_number)
Definition: FileReader.cpp:182
#define CHECK(condition)
Definition: Logger.h:209
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::consumeFirstLine ( std::optional< std::string > &  dest_str)
private

Definition at line 374 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, foreign_storage::ArchiveWrapper::currentEntryFinished(), import_export::CopyParams::line_delim, and foreign_storage::ArchiveWrapper::peekNextChar().

Referenced by skipHeader().

374  {
375  char* dest_buffer = nullptr;
376  while (!archive_.currentEntryFinished()) {
377  if (dest_str.has_value()) {
378  auto& str = dest_str.value();
379  str.resize(str.length() + 1);
380  dest_buffer = str.data() + str.length() - 1;
381  }
383  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
384  break;
385  }
386  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
387  }
388 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:206
bool currentEntryFinished() const
Definition: FileReader.h:220

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::CompressedFileReader::getFirstLine ( ) const
overrideprivatevirtual

Implements foreign_storage::SingleFileReader.

Definition at line 366 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::SingleFileReader::DEFAULT_HEADER_READ_SIZE, and foreign_storage::FileReader::file_path_.

366  {
368  auto first_line = std::make_optional<std::string>();
369  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
370  reader.consumeFirstLine(first_line);
371  return first_line.value();
372 }
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:239
import_export::CopyParams copy_params_
Definition: FileReader.h:118
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:137
size_t foreign_storage::CompressedFileReader::getRemainingSize ( )
inlineoverridevirtual
Returns
size of the remaining content to be read

Implements foreign_storage::FileReader.

Definition at line 269 of file FileReader.h.

269 { return 0; }
bool foreign_storage::CompressedFileReader::isRemainingSizeKnown ( )
inlineoverridevirtual
Returns
if remaining size is known

Implements foreign_storage::FileReader.

Definition at line 268 of file FileReader.h.

268 { return false; };
bool foreign_storage::CompressedFileReader::isScanFinished ( )
inlineoverridevirtual
Returns
true if the entire file has been read

Implements foreign_storage::FileReader.

Definition at line 266 of file FileReader.h.

References scan_finished_.

Referenced by readRegion().

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::nextEntry ( )
private

Go to next archive entry/header with valid data

Definition at line 323 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), scan_finished_, skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

Referenced by checkForMoreRows(), CompressedFileReader(), and readInternal().

323  {
324  do {
325  // Go to the next index
326  current_index_++;
327  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
329  }
330  if (!initial_scan_) {
331  // Entry # in the archive is known and might not be the next one in the file
332  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
334  skipHeader();
335  } else {
336  scan_finished_ = true;
337  return;
338  }
339  } else {
340  // Read next header in archive and save the sourcename
341  if (archive_.nextEntry()) {
342  // read headers until one has data
343  CHECK(sourcenames_.size() == archive_entry_index_.size());
344  sourcenames_.emplace_back(archive_.entryName());
346  skipHeader();
347  } else {
348  scan_finished_ = true;
349  initial_scan_ = false;
350  return;
351  }
352  }
353  } while (archive_.currentEntryFinished());
354 }
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
bool currentEntryFinished() const
Definition: FileReader.h:220
void skipToEntry(int entry_number)
Definition: FileReader.cpp:182
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::read ( void *  buffer,
size_t  max_size 
)
overridevirtual

Read up to max_size bytes from archive into buffer starting starting from the end of the last read

Parameters
buffer- buffer to load into
max_size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 291 of file FileReader.cpp.

References readInternal().

291  {
292  // Leave one extra char in case we need to insert a delimiter
293  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
294  return bytes_read;
295 }
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:265

+ Here is the call graph for this function:

size_t foreign_storage::CompressedFileReader::readInternal ( void *  buffer,
size_t  read_size,
size_t  buffer_size 
)
private

Definition at line 265 of file FileReader.cpp.

References foreign_storage::anonymous_namespace{FileReader.cpp}::adjust_eof(), archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::FileReader::copy_params_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), run_benchmark_import::dest, import_export::CopyParams::line_delim, and nextEntry().

Referenced by read(), and readRegion().

267  {
268  size_t remaining_size = read_size;
269  char* dest = static_cast<char*>(buffer);
270  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
271  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
273  : remaining_size;
274  // copy data into dest
275  archive_.consumeDataFromCurrentEntry(copy_size, dest);
276  remaining_size -= copy_size;
277  dest += copy_size;
278  }
279  size_t bytes_read = read_size - remaining_size;
280  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
281  adjust_eof(
282  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
283  current_offset_ += bytes_read;
284  nextEntry();
285  } else {
286  current_offset_ += bytes_read;
287  }
288  return bytes_read;
289 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:206
bool currentEntryFinished() const
Definition: FileReader.h:220
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:36
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CompressedFileReader::readRegion ( void *  buffer,
size_t  offset,
size_t  size 
)
overridevirtual

Read up to max_size bytes from archive, starting at given offset isScanFinished() must return true to use readRegion

Parameters
buffer- buffer to load into
offset- starting point into the archive to read
size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::FileReader.

Definition at line 297 of file FileReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), isScanFinished(), foreign_storage::anonymous_namespace{FileReader.cpp}::offset_to_index(), readInternal(), skipBytes(), skipHeader(), and foreign_storage::ArchiveWrapper::skipToEntry().

297  {
299 
300  // Determine where in the archive we are
301  size_t index = offset_to_index(cumulative_sizes_, offset);
302  CHECK(archive_entry_index_.size() > index);
303  auto archive_entry = archive_entry_index_[index];
304  current_index_ = static_cast<int>(index);
305 
306  // If we are in the wrong entry or too far in the right one skip to the correct entry
307  if (archive_entry != archive_.getCurrentEntryIndex() ||
308  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
309  archive_.skipToEntry(archive_entry);
310  skipHeader();
311  current_offset_ = 0;
312  if (index > 0) {
313  current_offset_ = cumulative_sizes_[index - 1];
314  }
315  }
316  skipBytes(offset - current_offset_);
317  return readInternal(buffer, size, size);
318 }
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:56
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:265
void skipToEntry(int entry_number)
Definition: FileReader.cpp:182
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::resetArchive ( )
private

Reopen file and reset back to the beginning

void foreign_storage::CompressedFileReader::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Implements foreign_storage::FileReader.

Definition at line 505 of file FileReader.cpp.

References foreign_storage::json_utils::add_value_to_object(), archive_entry_index_, CHECK, cumulative_sizes_, initial_scan_, scan_finished_, and sourcenames_.

507  {
508  // Should be done initial scan
511 
512  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
514  value, cumulative_sizes_, "cumulative_sizes", allocator);
516  value, archive_entry_index_, "archive_entry_index", allocator);
517 };
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

void foreign_storage::CompressedFileReader::skipBytes ( size_t  n_bytes)
private

Skip forward N bytes in current entry without reading the data

Parameters
n_bytes- number of bytes to skip

Skip forward N bytes without reading the data in current entry

Parameters
n_bytes- number of bytes to skip

Definition at line 394 of file FileReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), current_offset_, and foreign_storage::ArchiveWrapper::currentEntryDataAvailable().

Referenced by checkForMoreRows(), and readRegion().

394  {
395  current_offset_ += n_bytes;
396  while (n_bytes > 0) {
398  // We've reached the end of the entry
399  return;
400  }
401  // Keep fetching blocks/entries until we've gone through N bytes
402  if (archive_.currentEntryDataAvailable() <= n_bytes) {
403  n_bytes -= archive_.currentEntryDataAvailable();
405  } else {
407  n_bytes = 0;
408  }
409  }
410 }
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:206
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CompressedFileReader::skipHeader ( )
overrideprivatevirtual

Skip Header of file

Skip file header

Implements foreign_storage::SingleFileReader.

Definition at line 359 of file FileReader.cpp.

References consumeFirstLine(), foreign_storage::FileReader::copy_params_, import_export::CopyParams::has_header, and import_export::NO_HEADER.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

359  {
361  std::optional<std::string> str = std::nullopt;
362  consumeFirstLine(str);
363  }
364 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
ImportHeaderRow has_header
Definition: CopyParams.h:51
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:374

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

ArchiveWrapper foreign_storage::CompressedFileReader::archive_
private
std::vector<int> foreign_storage::CompressedFileReader::archive_entry_index_
private
std::vector<size_t> foreign_storage::CompressedFileReader::cumulative_sizes_
private
int foreign_storage::CompressedFileReader::current_index_
private

Definition at line 319 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

size_t foreign_storage::CompressedFileReader::current_offset_
private

Definition at line 315 of file FileReader.h.

Referenced by checkForMoreRows(), nextEntry(), readInternal(), readRegion(), and skipBytes().

bool foreign_storage::CompressedFileReader::initial_scan_
private

Definition at line 310 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().

bool foreign_storage::CompressedFileReader::scan_finished_
private
std::vector<std::string> foreign_storage::CompressedFileReader::sourcenames_
private

Definition at line 324 of file FileReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().


The documentation for this class was generated from the following files: