OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RegexFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 #include "Shared/StringTransform.h"
22 
23 namespace foreign_storage {
24 namespace {
26 
27 size_t find_last_end_of_line(const char* buffer,
28  size_t buffer_size,
29  size_t start,
30  size_t end,
31  char line_delim) {
32  int64_t i = end;
33  while (i >= static_cast<int64_t>(start)) {
34  if (buffer[i] == line_delim) {
35  return i;
36  } else {
37  i--;
38  }
39  }
41  "Unable to find an end of line character after reading " +
42  std::to_string(buffer_size) + " characters."};
43 }
44 
45 bool line_starts_with_regex(const char* buffer,
46  size_t start,
47  size_t end,
48  const boost::regex& line_start_regex) {
49  return boost::regex_search(std::string{buffer + start, end - start + 1},
50  line_start_regex,
51  boost::regex_constants::match_continuous);
52 }
53 
54 std::optional<std::string> get_line_start_regex(const ForeignTable* foreign_table) {
55  if (foreign_table) {
56  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_START_REGEX_KEY);
57  if (it != foreign_table->options.end()) {
58  return it->second;
59  }
60  }
61  return {};
62 }
63 
64 std::string get_line_regex(const ForeignTable* foreign_table) {
65  if (foreign_table) {
66  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_REGEX_KEY);
67  CHECK(it != foreign_table->options.end());
68  return it->second;
69  }
70  return {};
71 }
72 
73 std::string get_next_row(const char* curr,
74  const char* buffer_end,
75  char line_delim,
76  const std::optional<boost::regex>& line_start_regex) {
77  auto row_end = curr;
78  bool row_found{false};
79  while (!row_found && row_end <= buffer_end) {
80  if (*row_end == line_delim) {
81  if (row_end == buffer_end) {
82  row_found = true;
83  } else if (line_start_regex.has_value()) {
84  // When a LINE_START_REGEX option is present, concatenate the following lines
85  // until a line that starts with the specified regex is found.
86  CHECK(line_starts_with_regex(curr, 0, row_end - curr, line_start_regex.value()))
87  << "'" << line_start_regex.value() << "' not found in: '"
88  << std::string{curr, row_end - curr + 1ULL} << "'";
89  auto row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
90  while (!line_starts_with_regex(
91  row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
92  row_end += row_str.length() + 1;
93  if (row_end == buffer_end) {
94  break;
95  }
96  row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
97  }
98  row_found = true;
99  } else {
100  row_found = true;
101  }
102  }
103  row_end++;
104  }
105  CHECK(row_found);
106  return std::string{curr, static_cast<size_t>(row_end - curr - 1)};
107 }
108 
109 size_t get_row_count(const char* buffer,
110  size_t start,
111  size_t end,
112  char line_delim,
113  const std::optional<boost::regex>& line_start_regex) {
114  size_t row_count{0};
115  auto buffer_end = buffer + end;
116  auto curr = buffer + start;
117  while (curr <= buffer_end) {
118  auto row_str = get_next_row(curr, buffer_end, line_delim, line_start_regex);
119  curr += row_str.length() + 1;
120  row_count++;
121  }
122  return row_count;
123 }
124 
125 bool regex_match_columns(const std::string& row_str,
126  const boost::regex& line_regex,
127  size_t logical_column_count,
128  std::vector<std::string>& parsed_columns_str,
129  std::vector<std::string_view>& parsed_columns_sv,
130  const std::string& file_path) {
131  parsed_columns_str.clear();
132  parsed_columns_sv.clear();
133  boost::smatch match;
134  bool set_all_nulls{false};
135  if (boost::regex_match(row_str, match, line_regex)) {
136  auto matched_column_count = match.size() - 1;
137  if (logical_column_count != matched_column_count) {
139  logical_column_count, matched_column_count, file_path);
140  }
141  CHECK_GT(match.size(), static_cast<size_t>(1));
142  for (size_t i = 1; i < match.size(); i++) {
143  parsed_columns_str.emplace_back(match[i].str());
144  parsed_columns_sv.emplace_back(parsed_columns_str.back());
145  }
146  } else {
147  parsed_columns_sv =
148  std::vector<std::string_view>(logical_column_count, std::string_view{});
149  set_all_nulls = true;
150  }
151  return set_all_nulls;
152 }
153 
154 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
155  const std::string& option_name) {
156  if (auto it = foreign_table->options.find(option_name);
157  it != foreign_table->options.end()) {
158  if (boost::iequals(it->second, "TRUE")) {
159  return true;
160  } else if (boost::iequals(it->second, "FALSE")) {
161  return false;
162  } else {
163  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
164  "\" foreign table option. "
165  "Value must be either 'true' or 'false'."};
166  }
167  }
168  return std::nullopt;
169 }
170 } // namespace
171 
173  : line_regex_(get_line_regex(foreign_table))
174  , line_start_regex_(get_line_start_regex(foreign_table)) {}
175 
181  ParseBufferRequest& request,
182  bool convert_data_blocks,
183  bool columns_are_pre_filtered) const {
184  CHECK(request.buffer);
185  char* buffer_start = request.buffer.get() + request.begin_pos;
186  const char* buffer_end = request.buffer.get() + request.end_pos;
187 
188  std::vector<size_t> row_offsets;
189  row_offsets.emplace_back(request.file_offset + request.begin_pos);
190 
191  size_t current_row_id = 0;
192  size_t row_count = 0;
193  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
194  std::vector<std::string> parsed_columns_str;
195  parsed_columns_str.reserve(logical_column_count);
196  std::vector<std::string_view> parsed_columns_sv;
197  parsed_columns_sv.reserve(logical_column_count);
198 
200 
201  std::string row_str;
202  size_t remaining_row_count = request.process_row_count;
203  auto curr = buffer_start;
204  while (curr < buffer_end && remaining_row_count > 0) {
205  try {
206  row_str = get_next_row(
207  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
208  curr += row_str.length() + 1;
209  current_row_id = row_count++;
210  remaining_row_count--;
211 
212  bool skip_all_columns =
213  std::all_of(request.import_buffers.begin(),
214  request.import_buffers.end(),
215  [](const auto& import_buffer) { return !import_buffer; });
216  if (!skip_all_columns) {
217  auto columns = request.getColumns();
218 
219  bool set_all_nulls = false;
220  try {
221  set_all_nulls = regex_match_columns(row_str,
222  line_regex_,
223  logical_column_count,
224  parsed_columns_str,
225  parsed_columns_sv,
226  request.getFilePath());
227  } catch (const ForeignStorageException& e) {
228  if (request.track_rejected_rows) {
229  result.rejected_rows.insert(current_row_id);
230  auto cd_it = columns.begin();
231  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
232  continue;
233  } else {
234  throw;
235  }
236  }
237 
238  size_t parsed_column_index = 0;
239  size_t import_buffer_index = 0;
240 
241  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
242  auto cd = *cd_it;
243  const auto& column_type = cd->columnType;
244  if (request.import_buffers[import_buffer_index]) {
245  bool is_null = false;
246  try {
247  is_null =
248  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
249  cd,
250  request.copy_params.null_str));
251  } catch (const std::exception& e) {
252  if (request.track_rejected_rows) {
253  result.rejected_rows.insert(current_row_id);
255  columns, cd_it, import_buffer_index, request);
256  break; // skip rest of row
257  } else {
258  throw;
259  }
260  }
261  if (column_type.is_geometry()) {
262  auto starting_import_buffer_index = import_buffer_index;
263  try {
265  import_buffer_index,
266  request.copy_params,
267  cd_it,
268  parsed_columns_sv,
269  parsed_column_index,
270  is_null,
271  request.first_row_index,
272  row_count,
273  request.getCatalog(),
274  request.render_group_analyzer_map);
275  } catch (const std::exception& e) {
276  if (request.track_rejected_rows) {
277  result.rejected_rows.insert(current_row_id);
279  columns, cd_it, starting_import_buffer_index, request);
280  break; // skip rest of row
281  } else {
282  throw;
283  }
284  }
285  // Skip remaining physical columns
286  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
287  ++cd_it;
288  }
289  } else {
290  try {
291  request.import_buffers[import_buffer_index]->add_value(
292  cd,
293  parsed_columns_sv[parsed_column_index],
294  is_null,
295  request.copy_params);
296  } catch (const std::exception& e) {
297  if (request.track_rejected_rows) {
298  result.rejected_rows.insert(current_row_id);
300  columns, cd_it, import_buffer_index, request);
301  break; // skip rest of row
302  } else {
303  throw;
304  }
305  }
306  parsed_column_index++;
307  import_buffer_index++;
308  }
309  } else {
310  // Skip column
311  for (int i = 0; i < column_type.get_physical_cols(); i++) {
312  import_buffer_index++;
313  cd_it++;
314  }
315  parsed_column_index++;
316  import_buffer_index++;
317  }
318  }
319  }
320  } catch (const ForeignStorageException& e) {
321  throw;
322  } catch (const std::exception& e) {
323  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
324  "\" in row \"" + row_str + "\" in file \"" +
325  request.getFilePath() + "\"");
326  }
327  }
328  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
329 
330  result.row_offsets = row_offsets;
331  result.row_count = row_count;
332  if (convert_data_blocks) {
333  result.column_id_to_data_blocks_map =
335  }
336  return result;
337 }
338 
340  const ForeignTable* foreign_table) const {
341  import_export::CopyParams copy_params{};
342  copy_params.plain_text = true;
343  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
344  if (has_header.has_value()) {
345  if (has_header.value()) {
346  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
347  } else {
348  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
349  }
350  }
351  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
352  it != foreign_table->options.end()) {
353  copy_params.buffer_size = std::stoi(it->second);
354  }
355  if (auto it = foreign_table->options.find(THREADS_KEY);
356  it != foreign_table->options.end()) {
357  copy_params.threads = std::stoi(it->second);
358  }
359  return copy_params;
360 }
361 
363  size_t& alloc_size,
364  std::unique_ptr<char[]>& buffer,
365  size_t& buffer_size,
366  const import_export::CopyParams& copy_params,
367  const size_t buffer_first_row_index,
368  unsigned int& num_rows_in_buffer,
369  foreign_storage::FileReader* file_reader) const {
370  CHECK_GT(buffer_size, static_cast<size_t>(0));
371  size_t start_pos{0};
372  size_t end_pos = buffer_size - 1;
373  bool found_end_pos{false};
374  while (!found_end_pos) {
375  try {
376  end_pos = find_last_end_of_line(
377  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
378  if (file_reader->isEndOfLastFile()) {
379  CHECK_EQ(end_pos, buffer_size - 1);
380  found_end_pos = true;
381  } else if (line_start_regex_.has_value()) {
382  // When a LINE_START_REGEX option is present and the file reader is not at the end
383  // of file, return the position of the end of line before the last line that
384  // matches the line start regex, since the last line that matches the line start
385  // regex in this buffer may still have to include/concatenate lines beyond this
386  // buffer.
387  CHECK_GT(end_pos, static_cast<size_t>(0));
388  auto old_end_pos = end_pos;
389  end_pos = find_last_end_of_line(buffer.get(),
390  buffer_size,
391  start_pos,
392  old_end_pos - 1,
393  copy_params.line_delim);
394  while (!line_starts_with_regex(
395  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
396  old_end_pos = end_pos;
397  end_pos = find_last_end_of_line(buffer.get(),
398  buffer_size,
399  start_pos,
400  old_end_pos - 1,
401  copy_params.line_delim);
402  }
403  found_end_pos = true;
404  } else {
405  found_end_pos = true;
406  }
407  } catch (InsufficientBufferSizeException& e) {
409  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
410  throw;
411  }
412  start_pos = buffer_size;
414  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
415  end_pos = buffer_size - 1;
416  }
417  }
418  CHECK(found_end_pos);
419  num_rows_in_buffer =
420  get_row_count(buffer.get(), 0, end_pos, copy_params.line_delim, line_start_regex_);
421  return end_pos + 1;
422 }
423 
425  const ForeignTable* foreign_table) const {
426  if (line_start_regex_.has_value()) {
427  // When a LINE_START_REGEX option is specified, at least the first line in each file
428  // has to start with the specified regex.
429  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
430  for (const auto& [file_path, line] : first_line_by_file_path) {
432  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
433  auto line_start_regex = get_line_start_regex(foreign_table);
434  CHECK(line_start_regex.has_value());
435  throw ForeignStorageException{"First line in file \"" + file_path +
436  "\" does not match line start regex \"" +
437  line_start_regex.value() + "\""};
438  }
439  }
440  }
441 }
442 
445 }
446 
448  return max_buffer_resize_;
449 }
450 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
RegexFileBufferParser(const ForeignTable *foreign_table)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
bool regex_match_columns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path)
virtual bool isScanFinished()=0
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
#define CHECK(condition)
Definition: Logger.h:223
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex)
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override