OmniSciDB  2e3a973ef4
foreign_storage::CompressedFileReader Class Reference

#include <CsvReader.h>

+ Inheritance diagram for foreign_storage::CompressedFileReader:
+ Collaboration diagram for foreign_storage::CompressedFileReader:

Public Member Functions

 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 CompressedFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
bool isRemainingSizeKnown () override
 
size_t getRemainingSize () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::CsvReader
 CsvReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~CsvReader ()=default
 

Private Member Functions

void resetArchive ()
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
void nextEntry ()
 
void skipHeader ()
 
void skipBytes (size_t n_bytes)
 
size_t readInternal (void *buffer, size_t read_size, size_t buffer_size)
 

Private Attributes

ArchiveWrapper archive_
 
bool initial_scan_
 
bool scan_finished_
 
size_t current_offset_
 
int current_index_
 
std::vector< size_t > cumulative_sizes_
 
std::vector< std::string > sourcenames_
 
std::vector< int > archive_entry_index_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::CsvReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 218 of file CsvReader.h.

Constructor & Destructor Documentation

◆ CompressedFileReader() [1/2]

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Definition at line 215 of file CsvReader.cpp.

References nextEntry().

217  : CsvReader(file_path, copy_params)
218  , archive_(file_path)
219  , initial_scan_(true)
220  , scan_finished_(false)
221  , current_offset_(0)
222  , current_index_(-1) {
223  // Initialize first entry
224  nextEntry();
225 }
CsvReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.h:36
+ Here is the call graph for this function:

◆ CompressedFileReader() [2/2]

foreign_storage::CompressedFileReader::CompressedFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 227 of file CsvReader.cpp.

References archive_entry_index_, cumulative_sizes_, foreign_storage::json_utils::get_value_from_object(), initial_scan_, scan_finished_, and sourcenames_.

230  : CompressedFileReader(file_path, copy_params) {
231  scan_finished_ = true;
232  initial_scan_ = false;
233  sourcenames_.clear();
234  archive_entry_index_.clear();
235  cumulative_sizes_.clear();
236  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
237  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
238  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
239 }
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:215
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
+ Here is the call graph for this function:

Member Function Documentation

◆ checkForMoreRows()

void foreign_storage::CompressedFileReader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping *  user_mapping 
)
overrideprivatevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed CSV
user_mapping- only needed for S3 backed CSV

Reimplemented from foreign_storage::CsvReader.

Definition at line 369 of file CsvReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::CsvReader::file_path_, initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), nextEntry(), foreign_storage::ArchiveWrapper::resetArchive(), scan_finished_, skipBytes(), skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

371  {
372  CHECK(initial_scan_ == false);
373  size_t initial_entries = archive_entry_index_.size();
374 
375  // Reset all entry indexes for existing items
376  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
377  archive_entry_index_[index] = -1;
378  }
379 
380  // Read headers and determine location of existing and new files
381  int entry_number = 0;
383  while (archive_.nextEntry()) {
384  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
385  if (it != sourcenames_.end()) {
386  // Record new index of already read file
387  auto index = it - sourcenames_.begin();
388  archive_entry_index_[index] = entry_number;
389  } else {
390  // Append new source file
391  sourcenames_.emplace_back(archive_.entryName());
392  archive_entry_index_.emplace_back(entry_number);
393  }
394  entry_number++;
395  }
396 
397  // Error if we are missing a file from a previous scan
398  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
399  if (archive_entry_index_[index] == -1) {
400  throw std::runtime_error{
401  "Foreign table refreshed with APPEND mode missing archive entry \"" +
402  sourcenames_[index] + "\" from file \"" +
403  boost::filesystem::path(file_path_).filename().string() + "\"."};
404  }
405  }
406 
408  if (initial_entries < archive_entry_index_.size()) {
409  // We found more files
410  current_index_ = static_cast<int>(initial_entries) - 1;
412  // iterate through new entries until we get one with data
413  do {
414  nextEntry();
415  } while (archive_.currentEntryFinished() &&
416  current_index_ < static_cast<int>(archive_entry_index_.size()));
417 
419  scan_finished_ = false;
420  }
421  } else {
422  // No new files but this may be an archive of a single file
423  // Check if we only have one CSV file and check if it has more data
424  // May have still have multiple entries with some empty that are ignored
425  // like directories
426  size_t last_size = 0;
427  size_t csv_index = -1;
428  size_t num_csv_entries = 0;
429  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
430  if (cumulative_sizes_[index] > last_size) {
431  csv_index = index;
432  num_csv_entries++;
433  last_size = cumulative_sizes_[index];
434  }
435  }
436  if (num_csv_entries == 1) {
437  current_index_ = static_cast<int>(csv_index);
438  current_offset_ = 0;
439  size_t last_eof = cumulative_sizes_[csv_index];
440 
441  // reset cumulative_sizes_ with initial zero sizes
442  auto old_cumulative_sizes = std::move(cumulative_sizes_);
443  cumulative_sizes_ = {};
444  for (size_t zero_index = 0; zero_index < csv_index; zero_index++) {
445  cumulative_sizes_.emplace_back(0);
446  }
447 
448  // Go to Index of CSV file and read to where we left off
450  skipHeader();
451  skipBytes(last_eof);
453  scan_finished_ = false;
454  } else {
455  // There was no new data, so put back the old data structure
456  cumulative_sizes_ = std::move(old_cumulative_sizes);
457  }
458  }
459  }
460 };
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
size_t currentEntryDataAvailable() const
Definition: CsvReader.h:185
void skipToEntry(int entry_number)
Definition: CsvReader.cpp:158
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ getRemainingSize()

size_t foreign_storage::CompressedFileReader::getRemainingSize ( )
inlineoverridevirtual
Returns
size of the CSV remaining to be read

Implements foreign_storage::CsvReader.

Definition at line 232 of file CsvReader.h.

References foreign_storage::CsvReader::checkForMoreRows(), and foreign_storage::CsvReader::serialize().

232 { return 0; }
+ Here is the call graph for this function:

◆ isRemainingSizeKnown()

bool foreign_storage::CompressedFileReader::isRemainingSizeKnown ( )
inlineoverridevirtual
Returns
if remaining size is known

Implements foreign_storage::CsvReader.

Definition at line 231 of file CsvReader.h.

231 { return false; };

◆ isScanFinished()

bool foreign_storage::CompressedFileReader::isScanFinished ( )
inlineoverridevirtual
Returns
true if the entire CSV has been read

Implements foreign_storage::CsvReader.

Definition at line 229 of file CsvReader.h.

Referenced by readRegion().

+ Here is the caller graph for this function:

◆ nextEntry()

void foreign_storage::CompressedFileReader::nextEntry ( )
private

Go to next archive entry/header with valid data

Definition at line 299 of file CsvReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryFinished(), foreign_storage::ArchiveWrapper::entryName(), foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), initial_scan_, foreign_storage::ArchiveWrapper::nextEntry(), scan_finished_, skipHeader(), foreign_storage::ArchiveWrapper::skipToEntry(), and sourcenames_.

Referenced by checkForMoreRows(), CompressedFileReader(), and readInternal().

299  {
300  do {
301  // Go to the next index
302  current_index_++;
303  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
305  }
306  if (!initial_scan_) {
307  // Entry # in the archive is known and might not be the next one in the file
308  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
310  skipHeader();
311  } else {
312  scan_finished_ = true;
313  return;
314  }
315  } else {
316  // Read next header in archive and save the sourcename
317  if (archive_.nextEntry()) {
318  // read headers until one has data
319  CHECK(sourcenames_.size() == archive_entry_index_.size());
320  sourcenames_.emplace_back(archive_.entryName());
322  skipHeader();
323  } else {
324  scan_finished_ = true;
325  initial_scan_ = false;
326  return;
327  }
328  }
329  } while (archive_.currentEntryFinished());
330 }
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
void skipToEntry(int entry_number)
Definition: CsvReader.cpp:158
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ read()

size_t foreign_storage::CompressedFileReader::read ( void *  buffer,
size_t  max_size 
)
overridevirtual

Read up to max_size bytes from archive into buffer starting starting from the end of the last read

Parameters
buffer- buffer to load into
max_size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::CsvReader.

Definition at line 267 of file CsvReader.cpp.

References readInternal().

267  {
268  // Leave one extra char in case we need to insert a delimiter
269  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
270  return bytes_read;
271 }
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: CsvReader.cpp:241
+ Here is the call graph for this function:

◆ readInternal()

size_t foreign_storage::CompressedFileReader::readInternal ( void *  buffer,
size_t  read_size,
size_t  buffer_size 
)
private

Definition at line 241 of file CsvReader.cpp.

References foreign_storage::anonymous_namespace{CsvReader.cpp}::adjust_eof(), archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::CsvReader::copy_params_, current_offset_, foreign_storage::ArchiveWrapper::currentEntryDataAvailable(), foreign_storage::ArchiveWrapper::currentEntryFinished(), run_benchmark_import::dest, import_export::CopyParams::line_delim, and nextEntry().

Referenced by read(), and readRegion().

243  {
244  size_t remaining_size = read_size;
245  char* dest = static_cast<char*>(buffer);
246  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
247  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
249  : remaining_size;
250  // copy data into dest
251  archive_.consumeDataFromCurrentEntry(copy_size, dest);
252  remaining_size -= copy_size;
253  dest += copy_size;
254  }
255  size_t bytes_read = read_size - remaining_size;
256  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
257  adjust_eof(
258  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
259  current_offset_ += bytes_read;
260  nextEntry();
261  } else {
262  current_offset_ += bytes_read;
263  }
264  return bytes_read;
265 }
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: CsvReader.cpp:182
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: CsvReader.cpp:33
size_t currentEntryDataAvailable() const
Definition: CsvReader.h:185
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ readRegion()

size_t foreign_storage::CompressedFileReader::readRegion ( void *  buffer,
size_t  offset,
size_t  size 
)
overridevirtual

Read up to max_size bytes from archive, starting at given offset isScanFinished() must return true to use readRegion

Parameters
buffer- buffer to load into
offset- starting point into the archive to read
size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::CsvReader.

Definition at line 273 of file CsvReader.cpp.

References archive_, archive_entry_index_, CHECK, cumulative_sizes_, current_index_, current_offset_, foreign_storage::ArchiveWrapper::getCurrentEntryIndex(), isScanFinished(), foreign_storage::anonymous_namespace{CsvReader.cpp}::offset_to_index(), readInternal(), skipBytes(), skipHeader(), and foreign_storage::ArchiveWrapper::skipToEntry().

273  {
275 
276  // Determine where in the archive we are
277  size_t index = offset_to_index(cumulative_sizes_, offset);
278  CHECK(archive_entry_index_.size() > index);
279  auto archive_entry = archive_entry_index_[index];
280  current_index_ = static_cast<int>(index);
281 
282  // If we are in the wrong entry or too far in the right one skip to the correct entry
283  if (archive_entry != archive_.getCurrentEntryIndex() ||
284  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
285  archive_.skipToEntry(archive_entry);
286  skipHeader();
287  current_offset_ = 0;
288  if (index > 0) {
289  current_offset_ = cumulative_sizes_[index - 1];
290  }
291  }
292  skipBytes(offset - current_offset_);
293  return readInternal(buffer, size, size);
294 }
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: CsvReader.cpp:241
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: CsvReader.cpp:53
void skipToEntry(int entry_number)
Definition: CsvReader.cpp:158
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ resetArchive()

void foreign_storage::CompressedFileReader::resetArchive ( )
private

Reopen file and reset back to the beginning

◆ serialize()

void foreign_storage::CompressedFileReader::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Implements foreign_storage::CsvReader.

Definition at line 462 of file CsvReader.cpp.

References foreign_storage::json_utils::add_value_to_object(), archive_entry_index_, CHECK, cumulative_sizes_, initial_scan_, scan_finished_, and sourcenames_.

464  {
465  // Should be done initial scan
468 
469  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
471  value, cumulative_sizes_, "cumulative_sizes", allocator);
473  value, archive_entry_index_, "archive_entry_index", allocator);
474 };
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ skipBytes()

void foreign_storage::CompressedFileReader::skipBytes ( size_t  n_bytes)
private

Skip forward N bytes in current entry without reading the data

Parameters
n_bytes- number of bytes to skip

Skip forward N bytes without reading the data in current entry

Parameters
n_bytes- number of bytes to skip

Definition at line 351 of file CsvReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), current_offset_, and foreign_storage::ArchiveWrapper::currentEntryDataAvailable().

Referenced by checkForMoreRows(), and readRegion().

351  {
352  current_offset_ += n_bytes;
353  while (n_bytes > 0) {
355  // We've reached the end of the entry
356  return;
357  }
358  // Keep fetching blocks/entries until we've gone through N bytes
359  if (archive_.currentEntryDataAvailable() <= n_bytes) {
360  n_bytes -= archive_.currentEntryDataAvailable();
362  } else {
364  n_bytes = 0;
365  }
366  }
367 }
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: CsvReader.cpp:182
size_t currentEntryDataAvailable() const
Definition: CsvReader.h:185
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ skipHeader()

void foreign_storage::CompressedFileReader::skipHeader ( )
private

Skip Header of CSV file

Definition at line 335 of file CsvReader.cpp.

References archive_, foreign_storage::ArchiveWrapper::consumeDataFromCurrentEntry(), foreign_storage::CsvReader::copy_params_, foreign_storage::ArchiveWrapper::currentEntryFinished(), import_export::CopyParams::has_header, import_export::CopyParams::line_delim, import_export::NO_HEADER, and foreign_storage::ArchiveWrapper::peekNextChar().

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

335  {
337  while (!archive_.currentEntryFinished()) {
340  break;
341  }
343  }
344  }
345 }
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
ImportHeaderRow has_header
Definition: CopyParams.h:48
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: CsvReader.cpp:182
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ archive_

ArchiveWrapper foreign_storage::CompressedFileReader::archive_
private

◆ archive_entry_index_

std::vector<int> foreign_storage::CompressedFileReader::archive_entry_index_
private

Definition at line 287 of file CsvReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), readRegion(), and serialize().

◆ cumulative_sizes_

std::vector<size_t> foreign_storage::CompressedFileReader::cumulative_sizes_
private

Definition at line 282 of file CsvReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), readRegion(), and serialize().

◆ current_index_

int foreign_storage::CompressedFileReader::current_index_
private

Definition at line 279 of file CsvReader.h.

Referenced by checkForMoreRows(), nextEntry(), and readRegion().

◆ current_offset_

size_t foreign_storage::CompressedFileReader::current_offset_
private

Definition at line 275 of file CsvReader.h.

Referenced by checkForMoreRows(), nextEntry(), readInternal(), readRegion(), and skipBytes().

◆ initial_scan_

bool foreign_storage::CompressedFileReader::initial_scan_
private

Definition at line 270 of file CsvReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().

◆ scan_finished_

bool foreign_storage::CompressedFileReader::scan_finished_
private

Definition at line 272 of file CsvReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().

◆ sourcenames_

std::vector<std::string> foreign_storage::CompressedFileReader::sourcenames_
private

Definition at line 284 of file CsvReader.h.

Referenced by checkForMoreRows(), CompressedFileReader(), nextEntry(), and serialize().


The documentation for this class was generated from the following files: