OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DelimitedParserUtils.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file DelimitedParserUtils.cpp
19  * @brief Implementation of delimited parser utils.
20  *
21  */
22 
24 
25 #include <string_view>
26 
28 #include "Logger/Logger.h"
30 
31 namespace {
32 inline bool is_eol(const char& c, const import_export::CopyParams& copy_params) {
33  return c == copy_params.line_delim || c == '\n' || c == '\r';
34 }
35 
36 inline void trim_space(const char*& field_begin, const char*& field_end) {
37  while (field_begin < field_end && (*field_begin == ' ' || *field_begin == '\r')) {
38  ++field_begin;
39  }
40  while (field_begin < field_end &&
41  (*(field_end - 1) == ' ' || *(field_end - 1) == '\r')) {
42  --field_end;
43  }
44 }
45 
46 inline void trim_quotes(const char*& field_begin,
47  const char*& field_end,
48  const import_export::CopyParams& copy_params) {
49  auto quote_begin = field_begin, quote_end = field_end;
50  if (copy_params.quoted) {
51  trim_space(quote_begin, quote_end);
52  }
53  if (copy_params.quoted && quote_end - quote_begin > 0) {
54  if (*quote_begin == copy_params.quote && *(quote_end - 1) == copy_params.quote) {
55  field_begin = ++quote_begin;
56  field_end = (quote_begin == quote_end) ? quote_end : --quote_end;
57  } else {
59  "Unable to trim quotes.");
60  }
61  }
62 }
63 } // namespace
64 
65 namespace import_export {
66 namespace delimited_parser {
67 size_t find_beginning(const char* buffer,
68  size_t begin,
69  size_t end,
70  const import_export::CopyParams& copy_params) {
71  // @TODO(wei) line_delim is in quotes note supported
72  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
73  return 0;
74  }
75  size_t i;
76  const char* buf = buffer + begin;
77  for (i = 0; i < end - begin; i++) {
78  if (buf[i] == copy_params.line_delim) {
79  return i + 1;
80  }
81  }
82  return i;
83 }
84 
85 size_t find_end(const char* buffer,
86  size_t size,
87  const import_export::CopyParams& copy_params,
88  unsigned int& num_rows_this_buffer,
89  size_t buffer_first_row_index,
90  bool& in_quote,
91  size_t offset) {
92  size_t last_line_delim_pos = 0;
93  const char* current = buffer + offset;
94  if (copy_params.quoted) {
95  while (current < buffer + size) {
96  while (!in_quote && current < buffer + size) {
97  // We are outside of quotes. We have to find the last possible line delimiter.
98  if (*current == copy_params.line_delim) {
99  last_line_delim_pos = current - buffer;
100  ++num_rows_this_buffer;
101  } else if (*current == copy_params.quote) {
102  in_quote = true;
103  }
104  ++current;
105  }
106 
107  while (in_quote && current < buffer + size) {
108  // We are in a quoted field. We have to find the ending quote.
109  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
110  (*(current + 1) == copy_params.quote)) {
111  ++current;
112  } else if (*current == copy_params.quote) {
113  in_quote = false;
114  }
115  ++current;
116  }
117  }
118  } else {
119  while (current < buffer + size) {
120  if (*current == copy_params.line_delim) {
121  last_line_delim_pos = current - buffer;
122  ++num_rows_this_buffer;
123  }
124  ++current;
125  }
126  }
127 
128  if (last_line_delim_pos <= 0) {
129  size_t excerpt_length = std::min<size_t>(50, size);
130  std::string buffer_excerpt{buffer, buffer + excerpt_length};
131  if (in_quote) {
132  std::string quote(1, copy_params.quote);
133  std::string error_message =
134  "Unable to find a matching end quote for the quote character '" + quote +
135  "' after reading " + std::to_string(size) +
136  " characters. Please ensure that all data fields are correctly formatted "
137  "or update the \"buffer_size\" option appropriately. Row number: " +
138  std::to_string(buffer_first_row_index + 1) +
139  ". First few characters in row: " + buffer_excerpt;
140  throw InsufficientBufferSizeException{error_message};
141  } else {
142  std::string error_message =
143  "Unable to find an end of line character after reading " +
144  std::to_string(size) +
145  " characters. Please ensure that the correct \"line_delimiter\" option is "
146  "specified or update the \"buffer_size\" option appropriately. Row number: " +
147  std::to_string(buffer_first_row_index + 1) +
148  ". First few characters in row: " + buffer_excerpt;
149  throw InsufficientBufferSizeException{error_message};
150  }
151  }
152 
153  return last_line_delim_pos + 1;
154 }
155 
157 
159  return max_buffer_resize;
160 }
161 
162 void set_max_buffer_resize(const size_t max_buffer_resize_param) {
163  max_buffer_resize = max_buffer_resize_param;
164 }
165 
166 size_t find_row_end_pos(size_t& alloc_size,
167  std::unique_ptr<char[]>& buffer,
168  size_t& buffer_size,
169  const CopyParams& copy_params,
170  const size_t buffer_first_row_index,
171  unsigned int& num_rows_in_buffer,
172  FILE* file,
173  foreign_storage::FileReader* file_reader) {
174  bool found_end_pos{false};
175  bool in_quote{false};
176  size_t offset{0};
177  size_t end_pos;
178  CHECK(file != nullptr || file_reader != nullptr);
180  while (!found_end_pos) {
181  try {
182  end_pos = delimited_parser::find_end(buffer.get(),
183  buffer_size,
184  copy_params,
185  num_rows_in_buffer,
186  buffer_first_row_index,
187  in_quote,
188  offset);
189  found_end_pos = true;
190  } catch (InsufficientBufferSizeException& e) {
191  if (alloc_size >= max_buffer_resize) {
192  throw;
193  }
194  if (file == nullptr && file_reader->isScanFinished()) {
195  throw;
196  }
197  offset = buffer_size;
199  buffer, buffer_size, alloc_size, file, file_reader, max_buffer_resize);
200  }
201  }
202  return end_pos;
203 }
204 
205 template <typename T>
206 const char* get_row(const char* buf,
207  const char* buf_end,
208  const char* entire_buf_end,
209  const import_export::CopyParams& copy_params,
210  const bool* is_array,
211  std::vector<T>& row,
212  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
213  bool& try_single_thread,
214  bool filter_empty_lines) {
215  const char* field = buf;
216  const char* p;
217  bool in_quote = false;
218  bool in_array = false;
219  bool has_escape = false;
220  bool strip_quotes = false;
221  try_single_thread = false;
222  for (p = buf; p < entire_buf_end; ++p) {
223  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
224  *(p + 1) == copy_params.quote) {
225  p++;
226  has_escape = true;
227  } else if (copy_params.quoted && *p == copy_params.quote) {
228  in_quote = !in_quote;
229  if (in_quote) {
230  strip_quotes = true;
231  }
232  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
233  is_array[row.size()]) {
234  in_array = true;
235  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
236  ++p;
237  if (*p == copy_params.array_end) {
238  in_array = false;
239  break;
240  }
241  }
242  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
243  if (!in_quote) {
244  if (!has_escape && !strip_quotes) {
245  const char* field_end = p;
246  if (copy_params.trim_spaces) {
247  trim_space(field, field_end);
248  }
249  row.emplace_back(field, field_end - field);
250  } else {
251  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
252  auto field_buf = tmp_buffers.back().get();
253  int j = 0, i = 0;
254  for (; i < p - field; i++, j++) {
255  if (has_escape && field[i] == copy_params.escape &&
256  field[i + 1] == copy_params.quote) {
257  field_buf[j] = copy_params.quote;
258  i++;
259  } else {
260  field_buf[j] = field[i];
261  }
262  }
263  const char* field_begin = field_buf;
264  const char* field_end = field_buf + j;
265  trim_quotes(field_begin, field_end, copy_params);
266  if (copy_params.trim_spaces) {
267  trim_space(field_begin, field_end);
268  }
269  row.emplace_back(field_begin, field_end - field_begin);
270  }
271  field = p + 1;
272  has_escape = false;
273  strip_quotes = false;
274 
275  if (is_eol(*p, copy_params)) {
276  // We are at the end of the row. Skip the line endings now.
277  if (filter_empty_lines) {
278  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
279  p++;
280  }
281  } else {
282  // skip DOS carriage return line feed only
283  if (p + 1 < buf_end && *p == '\r' && *(p + 1) == '\n') {
284  p++;
285  }
286  }
287  break;
288  }
289  }
290  }
291  }
292  /*
293  @TODO(wei) do error handling
294  */
295  if (in_quote) {
296  LOG(ERROR) << "Unmatched quote.";
297  try_single_thread = true;
298  }
299  if (in_array) {
300  LOG(ERROR) << "Unmatched array.";
301  try_single_thread = true;
302  }
303  return p;
304 }
305 
306 template const char* get_row(const char* buf,
307  const char* buf_end,
308  const char* entire_buf_end,
309  const import_export::CopyParams& copy_params,
310  const bool* is_array,
311  std::vector<std::string>& row,
312  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
313  bool& try_single_thread,
314  bool filter_empty_lines);
315 
316 template const char* get_row(const char* buf,
317  const char* buf_end,
318  const char* entire_buf_end,
319  const import_export::CopyParams& copy_params,
320  const bool* is_array,
321  std::vector<std::string_view>& row,
322  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
323  bool& try_single_thread,
324  bool filter_empty_lines);
325 
326 void parse_string_array(const std::string& s,
327  const import_export::CopyParams& copy_params,
328  std::vector<std::string>& string_vec,
329  bool truncate_values) {
330  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
331  return;
332  }
333  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
334  throw std::runtime_error("Malformed Array :" + s);
335  }
336 
337  std::string row(s.c_str() + 1, s.length() - 2);
338  if (row.empty()) { // allow empty arrays
339  return;
340  }
341  row.push_back('\n');
342 
343  bool try_single_thread = false;
344  import_export::CopyParams array_params = copy_params;
345  array_params.delimiter = copy_params.array_delim;
346  std::vector<std::unique_ptr<char[]>> tmp_buffers;
347  get_row(row.c_str(),
348  row.c_str() + row.length(),
349  row.c_str() + row.length(),
350  array_params,
351  nullptr,
352  string_vec,
353  tmp_buffers,
354  try_single_thread,
355  true);
356 
357  for (size_t i = 0; i < string_vec.size(); ++i) {
358  if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
359  if (truncate_values) {
360  string_vec[i] = string_vec[i].substr(0, StringDictionary::MAX_STRLEN);
361  } else {
362  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
364  }
365  }
366  }
367 
368  // use empty string to mark nulls
369  for (auto& value : string_vec) {
370  if (value == copy_params.null_str || value == "NULL" || value.empty()) {
371  value.clear();
372  }
373  }
374 }
375 
376 void extend_buffer(std::unique_ptr<char[]>& buffer,
377  size_t& buffer_size,
378  size_t& alloc_size,
379  FILE* file,
380  foreign_storage::FileReader* file_reader,
381  size_t max_buffer_resize) {
382  auto old_buffer = std::move(buffer);
383  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
384  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
385  << " bytes";
386  buffer = std::make_unique<char[]>(alloc_size);
387 
388  memcpy(buffer.get(), old_buffer.get(), buffer_size);
389  size_t fread_size;
390  CHECK(file != nullptr || file_reader != nullptr);
391  if (file != nullptr) {
392  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
393  } else {
394  fread_size = file_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
395  }
396  buffer_size += fread_size;
397 }
398 } // namespace delimited_parser
399 } // namespace import_export
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
virtual bool isScanFinished() const =0
virtual size_t read(void *buffer, size_t max_size)=0
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:285
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
std::string to_string(char const *&&v)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
void trim_space(const char *&field_begin, const char *&field_end)
#define CHECK(condition)
Definition: Logger.h:291
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr size_t MAX_STRLEN
static constexpr size_t max_import_buffer_resize_byte_size
Definition: CopyParams.h:37
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:247