OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::RegexFileBufferParser Class Reference

#include <RegexFileBufferParser.h>

+ Inheritance diagram for foreign_storage::RegexFileBufferParser:
+ Collaboration diagram for foreign_storage::RegexFileBufferParser:

Public Member Functions

 RegexFileBufferParser (const ForeignTable *foreign_table)
 
ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Member Functions

static void setSkipFirstLineForTesting (bool skip)
 
static void setMaxBufferResize (size_t max_buffer_resize)
 
- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Static Public Attributes

static const std::string LINE_REGEX_KEY = "LINE_REGEX"
 
static const std::string LINE_START_REGEX_KEY = "LINE_START_REGEX"
 
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Static Private Member Functions

static size_t getMaxBufferResize ()
 

Private Attributes

boost::regex line_regex_
 
std::optional< boost::regex > line_start_regex_
 

Static Private Attributes

static size_t max_buffer_resize_
 
static bool skip_first_line_ {false}
 

Detailed Description

Definition at line 34 of file RegexFileBufferParser.h.

Constructor & Destructor Documentation

foreign_storage::RegexFileBufferParser::RegexFileBufferParser ( const ForeignTable foreign_table)

Definition at line 155 of file RegexFileBufferParser.cpp.

156  : line_regex_(get_line_regex(foreign_table))
157  , line_start_regex_(get_line_start_regex(foreign_table)) {}
std::string get_line_regex(const ForeignTable *foreign_table)
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
std::optional< boost::regex > line_start_regex_

Member Function Documentation

size_t foreign_storage::RegexFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 287 of file RegexFileBufferParser.cpp.

References CHECK, CHECK_EQ, CHECK_GT, import_export::delimited_parser::extend_buffer(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::find_last_end_of_line(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_row_count(), getMaxBufferResize(), foreign_storage::FileReader::isEndOfLastFile(), foreign_storage::FileReader::isScanFinished(), import_export::CopyParams::line_delim, line_start_regex_, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::line_starts_with_regex(), and import_export::delimited_parser::max_buffer_resize.

294  {
295  CHECK_GT(buffer_size, static_cast<size_t>(0));
296  size_t start_pos{0};
297  size_t end_pos = buffer_size - 1;
298  bool found_end_pos{false};
299  while (!found_end_pos) {
300  try {
301  end_pos = find_last_end_of_line(
302  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
303  if (file_reader->isEndOfLastFile()) {
304  CHECK_EQ(end_pos, buffer_size - 1);
305  found_end_pos = true;
306  } else if (line_start_regex_.has_value()) {
307  // When a LINE_START_REGEX option is present and the file reader is not at the end
308  // of file, return the position of the end of line before the last line that
309  // matches the line start regex, since the last line that matches the line start
310  // regex in this buffer may still have to include/concatenate lines beyond this
311  // buffer.
312  CHECK_GT(end_pos, static_cast<size_t>(0));
313  auto old_end_pos = end_pos;
314  end_pos = find_last_end_of_line(buffer.get(),
315  buffer_size,
316  start_pos,
317  old_end_pos - 1,
318  copy_params.line_delim);
319  while (!line_starts_with_regex(
320  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
321  old_end_pos = end_pos;
322  end_pos = find_last_end_of_line(buffer.get(),
323  buffer_size,
324  start_pos,
325  old_end_pos - 1,
326  copy_params.line_delim);
327  }
328  found_end_pos = true;
329  } else {
330  found_end_pos = true;
331  }
332  } catch (InsufficientBufferSizeException& e) {
334  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
335  throw;
336  }
337  start_pos = buffer_size;
339  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
340  end_pos = buffer_size - 1;
341  }
342  }
343  CHECK(found_end_pos);
344  num_rows_in_buffer =
345  get_row_count(buffer.get(), 0, end_pos, copy_params.line_delim, line_start_regex_);
346  return end_pos + 1;
347 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:221
virtual bool isScanFinished()=0
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::optional< boost::regex > line_start_regex_
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
#define CHECK(condition)
Definition: Logger.h:209
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex)
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)

+ Here is the call graph for this function:

size_t foreign_storage::RegexFileBufferParser::getMaxBufferResize ( )
staticprivate

Definition at line 372 of file RegexFileBufferParser.cpp.

References max_buffer_resize_.

Referenced by findRowEndPosition().

372  {
373  return max_buffer_resize_;
374 }

+ Here is the caller graph for this function:

ParseBufferResult foreign_storage::RegexFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
overridevirtual

Parses a given file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 163 of file RegexFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::ParseBufferRequest::first_row_index, foreign_storage::ParseBufferRequest::foreign_table_schema, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_next_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), i, foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isNullDatum(), import_export::CopyParams::line_delim, line_regex_, line_start_regex_, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::regex_match_columns(), and run_benchmark_import::result.

166  {
167  CHECK(request.buffer);
168  char* buffer_start = request.buffer.get() + request.begin_pos;
169  const char* buffer_end = request.buffer.get() + request.end_pos;
170 
171  std::vector<size_t> row_offsets;
172  row_offsets.emplace_back(request.file_offset + request.begin_pos);
173 
174  size_t row_count = 0;
175  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
176  std::vector<std::string> parsed_columns_str;
177  parsed_columns_str.reserve(logical_column_count);
178  std::vector<std::string_view> parsed_columns_sv;
179  parsed_columns_sv.reserve(logical_column_count);
180 
181  std::string row_str;
182  size_t remaining_row_count = request.process_row_count;
183  auto curr = buffer_start;
184  while (curr < buffer_end && remaining_row_count > 0) {
185  try {
186  row_str = get_next_row(
187  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
188  curr += row_str.length() + 1;
189  row_count++;
190  remaining_row_count--;
191 
192  bool skip_all_columns =
193  std::all_of(request.import_buffers.begin(),
194  request.import_buffers.end(),
195  [](const auto& import_buffer) { return !import_buffer; });
196  if (!skip_all_columns) {
197  bool set_all_nulls = regex_match_columns(row_str,
198  line_regex_,
199  logical_column_count,
200  parsed_columns_str,
201  parsed_columns_sv,
202  request.getFilePath());
203 
204  size_t parsed_column_index = 0;
205  size_t import_buffer_index = 0;
206  auto columns = request.getColumns();
207  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
208  auto cd = *cd_it;
209  const auto& column_type = cd->columnType;
210  if (request.import_buffers[import_buffer_index]) {
211  bool is_null =
212  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
213  cd,
214  request.copy_params.null_str));
215  if (column_type.is_geometry()) {
216  processGeoColumn(request.import_buffers,
217  import_buffer_index,
218  request.copy_params,
219  cd_it,
220  parsed_columns_sv,
221  parsed_column_index,
222  is_null,
223  request.first_row_index,
224  row_count,
225  request.getCatalog());
226  // Skip remaining physical columns
227  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
228  ++cd_it;
229  }
230  } else {
231  request.import_buffers[import_buffer_index]->add_value(
232  cd,
233  parsed_columns_sv[parsed_column_index],
234  is_null,
235  request.copy_params);
236  parsed_column_index++;
237  import_buffer_index++;
238  }
239  } else {
240  // Skip column
241  for (int i = 0; i < column_type.get_physical_cols(); i++) {
242  import_buffer_index++;
243  cd_it++;
244  }
245  parsed_column_index++;
246  import_buffer_index++;
247  }
248  }
249  }
250  } catch (const ForeignStorageException& e) {
251  throw;
252  } catch (const std::exception& e) {
253  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
254  "\" in row \"" + row_str + "\" in file \"" +
255  request.getFilePath() + "\"");
256  }
257  }
258  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
259 
260  ParseBufferResult result{};
261  result.row_offsets = row_offsets;
262  result.row_count = row_count;
263  if (convert_data_blocks) {
264  result.column_id_to_data_blocks_map =
265  convertImportBuffersToDataBlocks(request.import_buffers);
266  }
267  return result;
268 }
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
CONSTEXPR DEVICE bool is_null(const T &value)
bool regex_match_columns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
#define CHECK(condition)
Definition: Logger.h:209
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)

+ Here is the call graph for this function:

void foreign_storage::RegexFileBufferParser::setMaxBufferResize ( size_t  max_buffer_resize)
static
void foreign_storage::RegexFileBufferParser::setSkipFirstLineForTesting ( bool  skip)
static

Definition at line 376 of file RegexFileBufferParser.cpp.

References skip_first_line_.

376  {
377  skip_first_line_ = skip;
378 }
import_export::CopyParams foreign_storage::RegexFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 270 of file RegexFileBufferParser.cpp.

References BUFFER_SIZE_KEY, import_export::HAS_HEADER, import_export::NO_HEADER, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, and skip_first_line_.

271  {
272  import_export::CopyParams copy_params{};
273  copy_params.plain_text = true;
274  if (skip_first_line_) {
275  // This branch should only be executed in tests
276  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
277  } else {
278  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
279  }
280  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
281  it != foreign_table->options.end()) {
282  copy_params.buffer_size = std::stoi(it->second);
283  }
284  return copy_params;
285 }
void foreign_storage::RegexFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
overridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 349 of file RegexFileBufferParser.cpp.

References CHECK, foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::get_line_start_regex(), foreign_storage::FileReader::getFirstLineForEachFile(), parse_ast::line, line_start_regex_, and foreign_storage::anonymous_namespace{RegexFileBufferParser.cpp}::line_starts_with_regex().

350  {
351  if (line_start_regex_.has_value()) {
352  // When a LINE_START_REGEX option is specified, at least the first line in each file
353  // has to start with the specified regex.
354  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
355  for (const auto& [file_path, line] : first_line_by_file_path) {
357  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
358  auto line_start_regex = get_line_start_regex(foreign_table);
359  CHECK(line_start_regex.has_value());
360  throw ForeignStorageException{"First line in file \"" + file_path +
361  "\" does not match line start regex \"" +
362  line_start_regex.value() + "\""};
363  }
364  }
365  }
366 }
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
#define CHECK(condition)
Definition: Logger.h:209
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)

+ Here is the call graph for this function:

Member Data Documentation

const std::string foreign_storage::RegexFileBufferParser::BUFFER_SIZE_KEY = "BUFFER_SIZE"
inlinestatic

Definition at line 62 of file RegexFileBufferParser.h.

Referenced by validateAndGetCopyParams().

boost::regex foreign_storage::RegexFileBufferParser::line_regex_
private

Definition at line 73 of file RegexFileBufferParser.h.

Referenced by parseBuffer().

const std::string foreign_storage::RegexFileBufferParser::LINE_REGEX_KEY = "LINE_REGEX"
inlinestatic
std::optional<boost::regex> foreign_storage::RegexFileBufferParser::line_start_regex_
private

Definition at line 74 of file RegexFileBufferParser.h.

Referenced by findRowEndPosition(), parseBuffer(), and validateFiles().

const std::string foreign_storage::RegexFileBufferParser::LINE_START_REGEX_KEY = "LINE_START_REGEX"
inlinestatic
size_t foreign_storage::RegexFileBufferParser::max_buffer_resize_
inlinestaticprivate
bool foreign_storage::RegexFileBufferParser::skip_first_line_ {false}
inlinestaticprivate

Definition at line 71 of file RegexFileBufferParser.h.

Referenced by setSkipFirstLineForTesting(), and validateAndGetCopyParams().


The documentation for this class was generated from the following files: