OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RegexFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
21 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 namespace {
27 
28 size_t find_last_end_of_line(const char* buffer,
29  size_t buffer_size,
30  size_t start,
31  size_t end,
32  char line_delim) {
33  int64_t i = end;
34  while (i >= static_cast<int64_t>(start)) {
35  if (buffer[i] == line_delim) {
36  return i;
37  } else {
38  i--;
39  }
40  }
42  "Unable to find an end of line character after reading " +
43  std::to_string(buffer_size) + " characters."};
44 }
45 
46 bool line_starts_with_regex(const char* buffer,
47  size_t start,
48  size_t end,
49  const boost::regex& line_start_regex) {
50  return boost::regex_search(std::string{buffer + start, end - start + 1},
51  line_start_regex,
52  boost::regex_constants::match_continuous);
53 }
54 
55 std::optional<std::string> get_line_start_regex(const ForeignTable* foreign_table) {
56  if (foreign_table) {
57  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_START_REGEX_KEY);
58  if (it != foreign_table->options.end()) {
59  return it->second;
60  }
61  }
62  return {};
63 }
64 
65 std::string get_line_regex(const ForeignTable* foreign_table) {
66  if (foreign_table) {
67  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_REGEX_KEY);
68  CHECK(it != foreign_table->options.end());
69  return it->second;
70  }
71  return {};
72 }
73 
74 std::string get_next_row(const char* curr,
75  const char* buffer_end,
76  char line_delim,
77  const std::optional<boost::regex>& line_start_regex) {
78  auto row_end = curr;
79  bool row_found{false};
80  while (!row_found && row_end <= buffer_end) {
81  if (*row_end == line_delim) {
82  if (row_end == buffer_end) {
83  row_found = true;
84  } else if (line_start_regex.has_value()) {
85  // When a LINE_START_REGEX option is present, concatenate the following lines
86  // until a line that starts with the specified regex is found.
87  CHECK(line_starts_with_regex(curr, 0, row_end - curr, line_start_regex.value()))
88  << "'" << line_start_regex.value() << "' not found in: '"
89  << std::string{curr, row_end - curr + 1ULL} << "'";
90  auto row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
91  while (!line_starts_with_regex(
92  row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
93  row_end += row_str.length() + 1;
94  if (row_end == buffer_end) {
95  break;
96  }
97  row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
98  }
99  row_found = true;
100  } else {
101  row_found = true;
102  }
103  }
104  row_end++;
105  }
106  CHECK(row_found);
107  return std::string{curr, static_cast<size_t>(row_end - curr - 1)};
108 }
109 
110 size_t get_row_count(const char* buffer,
111  size_t start,
112  size_t end,
113  char line_delim,
114  const std::optional<boost::regex>& line_start_regex,
115  const boost::regex& line_regex,
116  bool remove_non_matches) {
117  size_t row_count{0};
118  auto buffer_end = buffer + end;
119  auto curr = buffer + start;
120  while (curr <= buffer_end) {
121  auto row_str = get_next_row(curr, buffer_end, line_delim, line_start_regex);
122  curr += row_str.length() + 1;
123  if (remove_non_matches) {
124  if (boost::regex_match(row_str, line_regex)) {
125  row_count++;
126  }
127  } else {
128  row_count++;
129  }
130  }
131  return row_count;
132 }
133 
134 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
135  const std::string& option_name) {
136  if (auto it = foreign_table->options.find(option_name);
137  it != foreign_table->options.end()) {
138  if (boost::iequals(it->second, "TRUE")) {
139  return true;
140  } else if (boost::iequals(it->second, "FALSE")) {
141  return false;
142  } else {
143  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
144  "\" foreign table option. "
145  "Value must be either 'true' or 'false'."};
146  }
147  }
148  return std::nullopt;
149 }
150 } // namespace
151 
153  : line_regex_(get_line_regex(foreign_table))
154  , line_start_regex_(get_line_start_regex(foreign_table)) {}
155 
161  bool convert_data_blocks,
162  bool columns_are_pre_filtered,
163  bool skip_dict_encoding) const {
164  CHECK(request.buffer);
165  char* buffer_start = request.buffer.get() + request.begin_pos;
166  const char* buffer_end = request.buffer.get() + request.end_pos;
167 
168  std::vector<size_t> row_offsets;
169  row_offsets.emplace_back(request.file_offset + request.begin_pos);
170 
171  size_t current_row_id = 0;
172  size_t row_count = 0;
173  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
174  std::vector<std::string> parsed_columns_str;
175  parsed_columns_str.reserve(logical_column_count);
176  std::vector<std::string_view> parsed_columns_sv;
177  parsed_columns_sv.reserve(logical_column_count);
178 
180 
181  std::string row_str;
182  size_t remaining_row_count = request.process_row_count;
183  auto curr = buffer_start;
184  while (curr < buffer_end && remaining_row_count > 0) {
185  try {
186  row_str = get_next_row(
187  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
188  curr += row_str.length() + 1;
189  current_row_id = row_count++;
190  remaining_row_count--;
191 
192  bool skip_all_columns =
193  std::all_of(request.import_buffers.begin(),
194  request.import_buffers.end(),
195  [](const auto& import_buffer) { return !import_buffer; });
196  if (!skip_all_columns) {
197  auto columns = request.getColumns();
198 
199  bool set_all_nulls = false;
200  try {
201  parsed_columns_str.clear();
202  parsed_columns_sv.clear();
203  set_all_nulls = regexMatchColumns(row_str,
204  line_regex_,
205  logical_column_count,
206  parsed_columns_str,
207  parsed_columns_sv,
208  request.getFilePath());
209  if (set_all_nulls && shouldRemoveNonMatches()) {
210  current_row_id = row_count--;
211  remaining_row_count++;
212  continue;
213  }
214  } catch (const ForeignStorageException& e) {
215  if (request.track_rejected_rows) {
216  result.rejected_rows.insert(current_row_id);
217  auto cd_it = columns.begin();
218  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
219  continue;
220  } else {
221  throw;
222  }
223  }
224 
225  size_t parsed_column_index = 0;
226  size_t import_buffer_index = 0;
227 
228  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
229  auto cd = *cd_it;
230  const auto& column_type = cd->columnType;
231  if (request.import_buffers[import_buffer_index]) {
232  bool is_null = false;
233  try {
234  is_null =
235  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
236  cd,
237  request.copy_params.null_str));
238  } catch (const std::exception& e) {
239  if (request.track_rejected_rows) {
240  result.rejected_rows.insert(current_row_id);
242  columns, cd_it, import_buffer_index, request);
243  break; // skip rest of row
244  } else {
245  throw;
246  }
247  }
248  if (column_type.is_geometry()) {
249  auto starting_import_buffer_index = import_buffer_index;
250  try {
252  import_buffer_index,
253  request.copy_params,
254  cd_it,
255  parsed_columns_sv,
256  parsed_column_index,
257  is_null,
258  request.first_row_index,
259  row_count,
260  request.getCatalog(),
261  request.render_group_analyzer_map);
262  } catch (const std::exception& e) {
263  if (request.track_rejected_rows) {
264  result.rejected_rows.insert(current_row_id);
266  columns, cd_it, starting_import_buffer_index, request);
267  break; // skip rest of row
268  } else {
269  throw;
270  }
271  }
272  // Skip remaining physical columns
273  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
274  ++cd_it;
275  }
276  } else {
277  try {
278  auto& column_sv = parsed_columns_sv[parsed_column_index];
279  if (column_type.is_string() && shouldTruncateStringValues() &&
280  column_sv.length() > StringDictionary::MAX_STRLEN) {
281  column_sv = column_sv.substr(0, StringDictionary::MAX_STRLEN);
282  }
283  request.import_buffers[import_buffer_index]->add_value(
284  cd,
285  parsed_columns_sv[parsed_column_index],
286  is_null,
287  request.copy_params);
288  } catch (const std::exception& e) {
289  if (request.track_rejected_rows) {
290  result.rejected_rows.insert(current_row_id);
292  columns, cd_it, import_buffer_index, request);
293  break; // skip rest of row
294  } else {
295  throw;
296  }
297  }
298  parsed_column_index++;
299  import_buffer_index++;
300  }
301  } else {
302  // Skip column
303  for (int i = 0; i < column_type.get_physical_cols(); i++) {
304  import_buffer_index++;
305  cd_it++;
306  }
307  parsed_column_index++;
308  import_buffer_index++;
309  }
310  }
311  }
312  } catch (const ForeignStorageException& e) {
313  throw;
314  } catch (const std::exception& e) {
315  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
316  "\" in row \"" + row_str + "\" in file \"" +
317  request.getFilePath() + "\"");
318  }
319  }
320  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
321 
322  result.row_offsets = row_offsets;
323  result.row_count = row_count;
324  if (convert_data_blocks) {
325  result.column_id_to_data_blocks_map =
326  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
327  }
328  return result;
329 }
330 
332  const std::string& row_str,
333  const boost::regex& line_regex,
334  size_t logical_column_count,
335  std::vector<std::string>& parsed_columns_str,
336  std::vector<std::string_view>& parsed_columns_sv,
337  const std::string& file_path) const {
338  boost::smatch match;
339  bool set_all_nulls{false};
340  if (boost::regex_match(row_str, match, line_regex)) {
341  auto matched_column_count = match.size() - 1 + parsed_columns_sv.size();
342  if (logical_column_count != matched_column_count) {
344  logical_column_count, matched_column_count, file_path);
345  }
346  CHECK_GT(match.size(), static_cast<size_t>(1));
347  for (size_t i = 1; i < match.size(); i++) {
348  parsed_columns_str.emplace_back(match[i].str());
349  parsed_columns_sv.emplace_back(parsed_columns_str.back());
350  }
351  } else {
352  parsed_columns_str.clear();
353  parsed_columns_sv =
354  std::vector<std::string_view>(logical_column_count, std::string_view{});
355  set_all_nulls = true;
356  }
357  return set_all_nulls;
358 }
359 
361  const ForeignTable* foreign_table) const {
362  import_export::CopyParams copy_params{};
363  copy_params.plain_text = true;
364  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
365  if (has_header.has_value()) {
366  if (has_header.value()) {
367  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
368  } else {
369  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
370  }
371  } else {
372  // By default, regex parsed files are not assumed to have headers.
373  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
374  }
375  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
376  it != foreign_table->options.end()) {
377  copy_params.buffer_size = std::stoi(it->second);
378  }
379  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
380  it != foreign_table->options.end()) {
381  copy_params.threads = std::stoi(it->second);
382  }
383  return copy_params;
384 }
385 
387  size_t& alloc_size,
388  std::unique_ptr<char[]>& buffer,
389  size_t& buffer_size,
390  const import_export::CopyParams& copy_params,
391  const size_t buffer_first_row_index,
392  unsigned int& num_rows_in_buffer,
393  foreign_storage::FileReader* file_reader) const {
394  CHECK_GT(buffer_size, static_cast<size_t>(0));
395  size_t start_pos{0};
396  size_t end_pos = buffer_size - 1;
397  bool found_end_pos{false};
398  while (!found_end_pos) {
399  try {
400  end_pos = find_last_end_of_line(
401  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
402  if (file_reader->isEndOfLastFile()) {
403  CHECK_EQ(end_pos, buffer_size - 1);
404  found_end_pos = true;
405  } else if (line_start_regex_.has_value()) {
406  // When a LINE_START_REGEX option is present and the file reader is not at the end
407  // of file, return the position of the end of line before the last line that
408  // matches the line start regex, since the last line that matches the line start
409  // regex in this buffer may still have to include/concatenate lines beyond this
410  // buffer.
411  CHECK_GT(end_pos, static_cast<size_t>(0));
412  auto old_end_pos = end_pos;
413  end_pos = find_last_end_of_line(buffer.get(),
414  buffer_size,
415  start_pos,
416  old_end_pos - 1,
417  copy_params.line_delim);
418  while (!line_starts_with_regex(
419  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
420  old_end_pos = end_pos;
421  end_pos = find_last_end_of_line(buffer.get(),
422  buffer_size,
423  start_pos,
424  old_end_pos - 1,
425  copy_params.line_delim);
426  }
427  found_end_pos = true;
428  } else {
429  found_end_pos = true;
430  }
431  } catch (InsufficientBufferSizeException& e) {
433  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
434  throw;
435  }
436  start_pos = buffer_size;
438  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
439  end_pos = buffer_size - 1;
440  }
441  }
442  CHECK(found_end_pos);
443  num_rows_in_buffer = get_row_count(buffer.get(),
444  0,
445  end_pos,
446  copy_params.line_delim,
448  line_regex_,
450  return end_pos + 1;
451 }
452 
454  const ForeignTable* foreign_table) const {
455  if (line_start_regex_.has_value()) {
456  // When a LINE_START_REGEX option is specified, at least the first line in each file
457  // has to start with the specified regex.
458  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
459  for (const auto& [file_path, line] : first_line_by_file_path) {
460  if (!line.empty() &&
462  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
463  auto line_start_regex = get_line_start_regex(foreign_table);
464  CHECK(line_start_regex.has_value());
465  throw ForeignStorageException{"First line in file \"" + file_path +
466  "\" does not match line start regex \"" +
467  line_start_regex.value() + "\""};
468  }
469  }
470  }
471 }
472 
475 }
476 
478  return max_buffer_resize_;
479 }
480 
482  return false;
483 }
484 
486  return false;
487 }
488 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:297
virtual bool isScanFinished() const =0
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
RegexFileBufferParser(const ForeignTable *foreign_table)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:301
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
virtual bool regexMatchColumns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path) const
#define CHECK(condition)
Definition: Logger.h:289
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex, const boost::regex &line_regex, bool remove_non_matches)
static constexpr size_t MAX_STRLEN
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override