OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::RegexFileBufferParser Class Reference

#include <RegexFileBufferParser.h>

+ Inheritance diagram for foreign_storage::RegexFileBufferParser:
+ Collaboration diagram for foreign_storage::RegexFileBufferParser:

Public Member Functions

 RegexFileBufferParser (const ForeignTable *foreign_table)
 
ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Member Functions

static void setMaxBufferResize (size_t max_buffer_resize)
 
- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Static Public Attributes

static const std::string LINE_REGEX_KEY = "LINE_REGEX"
 
static const std::string LINE_START_REGEX_KEY = "LINE_START_REGEX"
 
static const std::string HEADER_KEY = "HEADER"
 
- Static Public Attributes inherited from foreign_storage::TextFileBufferParser
static const std::string THREADS_KEY = "THREADS"
 
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Static Private Member Functions

static size_t getMaxBufferResize ()
 

Private Attributes

boost::regex line_regex_
 
std::optional< boost::regex > line_start_regex_
 

Static Private Attributes

static size_t max_buffer_resize_
 
static bool skip_first_line_ {false}
 

Detailed Description

Definition at line 23 of file RegexFileBufferParser.h.

Constructor & Destructor Documentation

foreign_storage::RegexFileBufferParser::RegexFileBufferParser ( const ForeignTable foreign_table)

Definition at line 172 of file RegexFileBufferParser.cpp.

173  : line_regex_(get_line_regex(foreign_table))
174  , line_start_regex_(get_line_start_regex(foreign_table)) {}
std::string get_line_regex(const ForeignTable *foreign_table)
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
std::optional< boost::regex > line_start_regex_

Member Function Documentation

size_t foreign_storage::RegexFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 362 of file RegexFileBufferParser.cpp.

References CHECK, CHECK_EQ, CHECK_GT, import_export::delimited_parser::extend_buffer(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::find_last_end_of_line(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_row_count(), getMaxBufferResize(), foreign_storage::FileReader::isEndOfLastFile(), foreign_storage::FileReader::isScanFinished(), import_export::CopyParams::line_delim, line_start_regex_, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::line_starts_with_regex(), and import_export::delimited_parser::max_buffer_resize.

369  {
370  CHECK_GT(buffer_size, static_cast<size_t>(0));
371  size_t start_pos{0};
372  size_t end_pos = buffer_size - 1;
373  bool found_end_pos{false};
374  while (!found_end_pos) {
375  try {
376  end_pos = find_last_end_of_line(
377  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
378  if (file_reader->isEndOfLastFile()) {
379  CHECK_EQ(end_pos, buffer_size - 1);
380  found_end_pos = true;
381  } else if (line_start_regex_.has_value()) {
382  // When a LINE_START_REGEX option is present and the file reader is not at the end
383  // of file, return the position of the end of line before the last line that
384  // matches the line start regex, since the last line that matches the line start
385  // regex in this buffer may still have to include/concatenate lines beyond this
386  // buffer.
387  CHECK_GT(end_pos, static_cast<size_t>(0));
388  auto old_end_pos = end_pos;
389  end_pos = find_last_end_of_line(buffer.get(),
390  buffer_size,
391  start_pos,
392  old_end_pos - 1,
393  copy_params.line_delim);
394  while (!line_starts_with_regex(
395  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
396  old_end_pos = end_pos;
397  end_pos = find_last_end_of_line(buffer.get(),
398  buffer_size,
399  start_pos,
400  old_end_pos - 1,
401  copy_params.line_delim);
402  }
403  found_end_pos = true;
404  } else {
405  found_end_pos = true;
406  }
407  } catch (InsufficientBufferSizeException& e) {
409  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
410  throw;
411  }
412  start_pos = buffer_size;
414  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
415  end_pos = buffer_size - 1;
416  }
417  }
418  CHECK(found_end_pos);
419  num_rows_in_buffer =
420  get_row_count(buffer.get(), 0, end_pos, copy_params.line_delim, line_start_regex_);
421  return end_pos + 1;
422 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:235
virtual bool isScanFinished()=0
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::optional< boost::regex > line_start_regex_
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
#define CHECK(condition)
Definition: Logger.h:223
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex)
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)

+ Here is the call graph for this function:

size_t foreign_storage::RegexFileBufferParser::getMaxBufferResize ( )
staticprivate

Definition at line 447 of file RegexFileBufferParser.cpp.

References max_buffer_resize_.

Referenced by findRowEndPosition().

447  {
448  return max_buffer_resize_;
449 }

+ Here is the caller graph for this function:

ParseBufferResult foreign_storage::RegexFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
overridevirtual

Parses a given file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 180 of file RegexFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData(), foreign_storage::ParseBufferRequest::first_row_index, foreign_storage::ParseBufferRequest::foreign_table_schema, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_next_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isNullDatum(), import_export::CopyParams::line_delim, line_regex_, line_start_regex_, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::regex_match_columns(), foreign_storage::ParseBufferRequest::render_group_analyzer_map, run_benchmark_import::result, and foreign_storage::ParseBufferRequest::track_rejected_rows.

183  {
184  CHECK(request.buffer);
185  char* buffer_start = request.buffer.get() + request.begin_pos;
186  const char* buffer_end = request.buffer.get() + request.end_pos;
187 
188  std::vector<size_t> row_offsets;
189  row_offsets.emplace_back(request.file_offset + request.begin_pos);
190 
191  size_t current_row_id = 0;
192  size_t row_count = 0;
193  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
194  std::vector<std::string> parsed_columns_str;
195  parsed_columns_str.reserve(logical_column_count);
196  std::vector<std::string_view> parsed_columns_sv;
197  parsed_columns_sv.reserve(logical_column_count);
198 
199  ParseBufferResult result{};
200 
201  std::string row_str;
202  size_t remaining_row_count = request.process_row_count;
203  auto curr = buffer_start;
204  while (curr < buffer_end && remaining_row_count > 0) {
205  try {
206  row_str = get_next_row(
207  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
208  curr += row_str.length() + 1;
209  current_row_id = row_count++;
210  remaining_row_count--;
211 
212  bool skip_all_columns =
213  std::all_of(request.import_buffers.begin(),
214  request.import_buffers.end(),
215  [](const auto& import_buffer) { return !import_buffer; });
216  if (!skip_all_columns) {
217  auto columns = request.getColumns();
218 
219  bool set_all_nulls = false;
220  try {
221  set_all_nulls = regex_match_columns(row_str,
222  line_regex_,
223  logical_column_count,
224  parsed_columns_str,
225  parsed_columns_sv,
226  request.getFilePath());
227  } catch (const ForeignStorageException& e) {
228  if (request.track_rejected_rows) {
229  result.rejected_rows.insert(current_row_id);
230  auto cd_it = columns.begin();
231  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
232  continue;
233  } else {
234  throw;
235  }
236  }
237 
238  size_t parsed_column_index = 0;
239  size_t import_buffer_index = 0;
240 
241  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
242  auto cd = *cd_it;
243  const auto& column_type = cd->columnType;
244  if (request.import_buffers[import_buffer_index]) {
245  bool is_null = false;
246  try {
247  is_null =
248  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
249  cd,
250  request.copy_params.null_str));
251  } catch (const std::exception& e) {
252  if (request.track_rejected_rows) {
253  result.rejected_rows.insert(current_row_id);
255  columns, cd_it, import_buffer_index, request);
256  break; // skip rest of row
257  } else {
258  throw;
259  }
260  }
261  if (column_type.is_geometry()) {
262  auto starting_import_buffer_index = import_buffer_index;
263  try {
264  processGeoColumn(request.import_buffers,
265  import_buffer_index,
266  request.copy_params,
267  cd_it,
268  parsed_columns_sv,
269  parsed_column_index,
270  is_null,
271  request.first_row_index,
272  row_count,
273  request.getCatalog(),
274  request.render_group_analyzer_map);
275  } catch (const std::exception& e) {
276  if (request.track_rejected_rows) {
277  result.rejected_rows.insert(current_row_id);
279  columns, cd_it, starting_import_buffer_index, request);
280  break; // skip rest of row
281  } else {
282  throw;
283  }
284  }
285  // Skip remaining physical columns
286  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
287  ++cd_it;
288  }
289  } else {
290  try {
291  request.import_buffers[import_buffer_index]->add_value(
292  cd,
293  parsed_columns_sv[parsed_column_index],
294  is_null,
295  request.copy_params);
296  } catch (const std::exception& e) {
297  if (request.track_rejected_rows) {
298  result.rejected_rows.insert(current_row_id);
300  columns, cd_it, import_buffer_index, request);
301  break; // skip rest of row
302  } else {
303  throw;
304  }
305  }
306  parsed_column_index++;
307  import_buffer_index++;
308  }
309  } else {
310  // Skip column
311  for (int i = 0; i < column_type.get_physical_cols(); i++) {
312  import_buffer_index++;
313  cd_it++;
314  }
315  parsed_column_index++;
316  import_buffer_index++;
317  }
318  }
319  }
320  } catch (const ForeignStorageException& e) {
321  throw;
322  } catch (const std::exception& e) {
323  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
324  "\" in row \"" + row_str + "\" in file \"" +
325  request.getFilePath() + "\"");
326  }
327  }
328  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
329 
330  result.row_offsets = row_offsets;
331  result.row_count = row_count;
332  if (convert_data_blocks) {
333  result.column_id_to_data_blocks_map =
334  convertImportBuffersToDataBlocks(request.import_buffers);
335  }
336  return result;
337 }
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
CONSTEXPR DEVICE bool is_null(const T &value)
bool regex_match_columns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
#define CHECK(condition)
Definition: Logger.h:223
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)

+ Here is the call graph for this function:

void foreign_storage::RegexFileBufferParser::setMaxBufferResize ( size_t  max_buffer_resize)
static
import_export::CopyParams foreign_storage::RegexFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 339 of file RegexFileBufferParser.cpp.

References foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY, HEADER_KEY, import_export::kHasHeader, import_export::kNoHeader, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, foreign_storage::TextFileBufferParser::THREADS_KEY, and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value().

340  {
341  import_export::CopyParams copy_params{};
342  copy_params.plain_text = true;
343  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
344  if (has_header.has_value()) {
345  if (has_header.value()) {
346  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
347  } else {
348  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
349  }
350  }
351  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
352  it != foreign_table->options.end()) {
353  copy_params.buffer_size = std::stoi(it->second);
354  }
355  if (auto it = foreign_table->options.find(THREADS_KEY);
356  it != foreign_table->options.end()) {
357  copy_params.threads = std::stoi(it->second);
358  }
359  return copy_params;
360 }
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)

+ Here is the call graph for this function:

void foreign_storage::RegexFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
overridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 424 of file RegexFileBufferParser.cpp.

References CHECK, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_line_start_regex(), foreign_storage::FileReader::getFirstLineForEachFile(), parse_ast::line, line_start_regex_, and foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::line_starts_with_regex().

425  {
426  if (line_start_regex_.has_value()) {
427  // When a LINE_START_REGEX option is specified, at least the first line in each file
428  // has to start with the specified regex.
429  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
430  for (const auto& [file_path, line] : first_line_by_file_path) {
432  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
433  auto line_start_regex = get_line_start_regex(foreign_table);
434  CHECK(line_start_regex.has_value());
435  throw ForeignStorageException{"First line in file \"" + file_path +
436  "\" does not match line start regex \"" +
437  line_start_regex.value() + "\""};
438  }
439  }
440  }
441 }
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
#define CHECK(condition)
Definition: Logger.h:223
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)

+ Here is the call graph for this function:

Member Data Documentation

const std::string foreign_storage::RegexFileBufferParser::HEADER_KEY = "HEADER"
inlinestatic

Definition at line 50 of file RegexFileBufferParser.h.

Referenced by validateAndGetCopyParams().

boost::regex foreign_storage::RegexFileBufferParser::line_regex_
private

Definition at line 61 of file RegexFileBufferParser.h.

Referenced by parseBuffer().

std::optional<boost::regex> foreign_storage::RegexFileBufferParser::line_start_regex_
private

Definition at line 62 of file RegexFileBufferParser.h.

Referenced by findRowEndPosition(), parseBuffer(), and validateFiles().

const std::string foreign_storage::RegexFileBufferParser::LINE_START_REGEX_KEY = "LINE_START_REGEX"
inlinestatic
size_t foreign_storage::RegexFileBufferParser::max_buffer_resize_
inlinestaticprivate
bool foreign_storage::RegexFileBufferParser::skip_first_line_ {false}
inlinestaticprivate

Definition at line 59 of file RegexFileBufferParser.h.


The documentation for this class was generated from the following files: