OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RegexFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/regex.hpp>
20 
23 #include "Shared/StringTransform.h"
24 
25 namespace foreign_storage {
26 namespace {
28 
29 size_t find_last_end_of_line(const char* buffer,
30  size_t buffer_size,
31  size_t start,
32  size_t end,
33  char line_delim) {
34  int64_t i = end;
35  while (i >= static_cast<int64_t>(start)) {
36  if (buffer[i] == line_delim) {
37  return i;
38  } else {
39  i--;
40  }
41  }
43  "Unable to find an end of line character after reading " +
44  std::to_string(buffer_size) + " characters."};
45 }
46 
47 bool line_starts_with_regex(const char* buffer,
48  size_t start,
49  size_t end,
50  const boost::regex& line_start_regex) {
51  return boost::regex_search(std::string{buffer + start, end - start + 1},
52  line_start_regex,
53  boost::regex_constants::match_continuous);
54 }
55 
56 std::optional<std::string> get_line_start_regex(const ForeignTable* foreign_table) {
57  if (foreign_table) {
58  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_START_REGEX_KEY);
59  if (it != foreign_table->options.end()) {
60  return it->second;
61  }
62  }
63  return {};
64 }
65 
66 std::string get_line_regex(const ForeignTable* foreign_table) {
67  if (foreign_table) {
68  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_REGEX_KEY);
69  CHECK(it != foreign_table->options.end());
70  return it->second;
71  }
72  return {};
73 }
74 
75 std::string get_next_row(const char* curr,
76  const char* buffer_end,
77  char line_delim,
78  const std::optional<boost::regex>& line_start_regex) {
79  auto row_end = curr;
80  bool row_found{false};
81  while (!row_found && row_end <= buffer_end) {
82  if (*row_end == line_delim) {
83  if (row_end == buffer_end) {
84  row_found = true;
85  } else if (line_start_regex.has_value()) {
86  // When a LINE_START_REGEX option is present, concatenate the following lines
87  // until a line that starts with the specified regex is found.
88  CHECK(line_starts_with_regex(curr, 0, row_end - curr, line_start_regex.value()));
89  auto row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
90  while (!line_starts_with_regex(
91  row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
92  row_end += row_str.length() + 1;
93  if (row_end == buffer_end) {
94  break;
95  }
96  row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
97  }
98  row_found = true;
99  } else {
100  row_found = true;
101  }
102  }
103  row_end++;
104  }
105  CHECK(row_found);
106  return std::string{curr, static_cast<size_t>(row_end - curr - 1)};
107 }
108 
109 size_t get_row_count(const char* buffer,
110  size_t start,
111  size_t end,
112  char line_delim,
113  const std::optional<boost::regex>& line_start_regex) {
114  size_t row_count{0};
115  auto buffer_end = buffer + end;
116  auto curr = buffer + start;
117  while (curr <= buffer_end) {
118  auto row_str = get_next_row(curr, buffer_end, line_delim, line_start_regex);
119  curr += row_str.length() + 1;
120  row_count++;
121  }
122  return row_count;
123 }
124 
125 bool regex_match_columns(const std::string& row_str,
126  const boost::regex& line_regex,
127  size_t logical_column_count,
128  std::vector<std::string>& parsed_columns_str,
129  std::vector<std::string_view>& parsed_columns_sv,
130  const std::string& file_path) {
131  parsed_columns_str.clear();
132  parsed_columns_sv.clear();
133  boost::smatch match;
134  bool set_all_nulls{false};
135  if (boost::regex_match(row_str, match, line_regex)) {
136  auto matched_column_count = match.size() - 1;
137  if (logical_column_count != matched_column_count) {
139  logical_column_count, matched_column_count, file_path);
140  }
141  CHECK_GT(match.size(), static_cast<size_t>(1));
142  for (size_t i = 1; i < match.size(); i++) {
143  parsed_columns_str.emplace_back(match[i].str());
144  parsed_columns_sv.emplace_back(parsed_columns_str.back());
145  }
146  } else {
147  parsed_columns_sv =
148  std::vector<std::string_view>(logical_column_count, std::string_view{});
149  set_all_nulls = true;
150  }
151  return set_all_nulls;
152 }
153 } // namespace
154 
156  : line_regex_(get_line_regex(foreign_table))
157  , line_start_regex_(get_line_start_regex(foreign_table)) {}
158 
164  ParseBufferRequest& request,
165  bool convert_data_blocks,
166  bool columns_are_pre_filtered) const {
167  CHECK(request.buffer);
168  char* buffer_start = request.buffer.get() + request.begin_pos;
169  const char* buffer_end = request.buffer.get() + request.end_pos;
170 
171  std::vector<size_t> row_offsets;
172  row_offsets.emplace_back(request.file_offset + request.begin_pos);
173 
174  size_t row_count = 0;
175  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
176  std::vector<std::string> parsed_columns_str;
177  parsed_columns_str.reserve(logical_column_count);
178  std::vector<std::string_view> parsed_columns_sv;
179  parsed_columns_sv.reserve(logical_column_count);
180 
181  std::string row_str;
182  size_t remaining_row_count = request.process_row_count;
183  auto curr = buffer_start;
184  while (curr < buffer_end && remaining_row_count > 0) {
185  try {
186  row_str = get_next_row(
187  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
188  curr += row_str.length() + 1;
189  row_count++;
190  remaining_row_count--;
191 
192  bool skip_all_columns =
193  std::all_of(request.import_buffers.begin(),
194  request.import_buffers.end(),
195  [](const auto& import_buffer) { return !import_buffer; });
196  if (!skip_all_columns) {
197  bool set_all_nulls = regex_match_columns(row_str,
198  line_regex_,
199  logical_column_count,
200  parsed_columns_str,
201  parsed_columns_sv,
202  request.getFilePath());
203 
204  size_t parsed_column_index = 0;
205  size_t import_buffer_index = 0;
206  auto columns = request.getColumns();
207  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
208  auto cd = *cd_it;
209  const auto& column_type = cd->columnType;
210  if (request.import_buffers[import_buffer_index]) {
211  bool is_null =
212  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
213  cd,
214  request.copy_params.null_str));
215  if (column_type.is_geometry()) {
217  import_buffer_index,
218  request.copy_params,
219  cd_it,
220  parsed_columns_sv,
221  parsed_column_index,
222  is_null,
223  request.first_row_index,
224  row_count,
225  request.getCatalog());
226  // Skip remaining physical columns
227  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
228  ++cd_it;
229  }
230  } else {
231  request.import_buffers[import_buffer_index]->add_value(
232  cd,
233  parsed_columns_sv[parsed_column_index],
234  is_null,
235  request.copy_params);
236  parsed_column_index++;
237  import_buffer_index++;
238  }
239  } else {
240  // Skip column
241  for (int i = 0; i < column_type.get_physical_cols(); i++) {
242  import_buffer_index++;
243  cd_it++;
244  }
245  parsed_column_index++;
246  import_buffer_index++;
247  }
248  }
249  }
250  } catch (const ForeignStorageException& e) {
251  throw;
252  } catch (const std::exception& e) {
253  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
254  "\" in row \"" + row_str + "\" in file \"" +
255  request.getFilePath() + "\"");
256  }
257  }
258  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
259 
261  result.row_offsets = row_offsets;
262  result.row_count = row_count;
263  if (convert_data_blocks) {
264  result.column_id_to_data_blocks_map =
266  }
267  return result;
268 }
269 
271  const ForeignTable* foreign_table) const {
272  import_export::CopyParams copy_params{};
273  copy_params.plain_text = true;
274  if (skip_first_line_) {
275  // This branch should only be executed in tests
276  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
277  } else {
278  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
279  }
280  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
281  it != foreign_table->options.end()) {
282  copy_params.buffer_size = std::stoi(it->second);
283  }
284  return copy_params;
285 }
286 
288  size_t& alloc_size,
289  std::unique_ptr<char[]>& buffer,
290  size_t& buffer_size,
291  const import_export::CopyParams& copy_params,
292  const size_t buffer_first_row_index,
293  unsigned int& num_rows_in_buffer,
294  foreign_storage::FileReader* file_reader) const {
295  CHECK_GT(buffer_size, static_cast<size_t>(0));
296  size_t start_pos{0};
297  size_t end_pos = buffer_size - 1;
298  bool found_end_pos{false};
299  while (!found_end_pos) {
300  try {
301  end_pos = find_last_end_of_line(
302  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
303  if (file_reader->isEndOfLastFile()) {
304  CHECK_EQ(end_pos, buffer_size - 1);
305  found_end_pos = true;
306  } else if (line_start_regex_.has_value()) {
307  // When a LINE_START_REGEX option is present and the file reader is not at the end
308  // of file, return the position of the end of line before the last line that
309  // matches the line start regex, since the last line that matches the line start
310  // regex in this buffer may still have to include/concatenate lines beyond this
311  // buffer.
312  CHECK_GT(end_pos, static_cast<size_t>(0));
313  auto old_end_pos = end_pos;
314  end_pos = find_last_end_of_line(buffer.get(),
315  buffer_size,
316  start_pos,
317  old_end_pos - 1,
318  copy_params.line_delim);
319  while (!line_starts_with_regex(
320  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
321  old_end_pos = end_pos;
322  end_pos = find_last_end_of_line(buffer.get(),
323  buffer_size,
324  start_pos,
325  old_end_pos - 1,
326  copy_params.line_delim);
327  }
328  found_end_pos = true;
329  } else {
330  found_end_pos = true;
331  }
332  } catch (InsufficientBufferSizeException& e) {
334  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
335  throw;
336  }
337  start_pos = buffer_size;
339  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
340  end_pos = buffer_size - 1;
341  }
342  }
343  CHECK(found_end_pos);
344  num_rows_in_buffer =
345  get_row_count(buffer.get(), 0, end_pos, copy_params.line_delim, line_start_regex_);
346  return end_pos + 1;
347 }
348 
350  const ForeignTable* foreign_table) const {
351  if (line_start_regex_.has_value()) {
352  // When a LINE_START_REGEX option is specified, at least the first line in each file
353  // has to start with the specified regex.
354  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
355  for (const auto& [file_path, line] : first_line_by_file_path) {
357  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
358  auto line_start_regex = get_line_start_regex(foreign_table);
359  CHECK(line_start_regex.has_value());
360  throw ForeignStorageException{"First line in file \"" + file_path +
361  "\" does not match line start regex \"" +
362  line_start_regex.value() + "\""};
363  }
364  }
365  }
366 }
367 
370 }
371 
373  return max_buffer_resize_;
374 }
375 
377  skip_first_line_ = skip;
378 }
379 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
RegexFileBufferParser(const ForeignTable *foreign_table)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string to_string(char const *&&v)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
bool regex_match_columns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path)
virtual bool isScanFinished()=0
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
#define CHECK(condition)
Definition: Logger.h:209
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex)
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override