OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DelimitedParserUtils.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file DelimitedParserUtils.cpp
19  * @brief Implementation of delimited parser utils.
20  *
21  */
22 
24 
25 #include <string_view>
26 
28 #include "Logger/Logger.h"
30 
31 namespace {
32 inline bool is_eol(const char& c, const import_export::CopyParams& copy_params) {
33  return c == copy_params.line_delim || c == '\n' || c == '\r';
34 }
35 
36 inline void trim_space(const char*& field_begin, const char*& field_end) {
37  while (field_begin < field_end && (*field_begin == ' ' || *field_begin == '\r')) {
38  ++field_begin;
39  }
40  while (field_begin < field_end &&
41  (*(field_end - 1) == ' ' || *(field_end - 1) == '\r')) {
42  --field_end;
43  }
44 }
45 
46 inline void trim_quotes(const char*& field_begin,
47  const char*& field_end,
48  const import_export::CopyParams& copy_params) {
49  if (copy_params.quoted && field_end - field_begin > 0 &&
50  *field_begin == copy_params.quote) {
51  ++field_begin;
52  }
53  if (copy_params.quoted && field_end - field_begin > 0 &&
54  *(field_end - 1) == copy_params.quote) {
55  --field_end;
56  }
57 }
58 } // namespace
59 
60 namespace import_export {
61 namespace delimited_parser {
62 size_t find_beginning(const char* buffer,
63  size_t begin,
64  size_t end,
65  const import_export::CopyParams& copy_params) {
66  // @TODO(wei) line_delim is in quotes note supported
67  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
68  return 0;
69  }
70  size_t i;
71  const char* buf = buffer + begin;
72  for (i = 0; i < end - begin; i++) {
73  if (buf[i] == copy_params.line_delim) {
74  return i + 1;
75  }
76  }
77  return i;
78 }
79 
80 size_t find_end(const char* buffer,
81  size_t size,
82  const import_export::CopyParams& copy_params,
83  unsigned int& num_rows_this_buffer,
84  size_t buffer_first_row_index,
85  bool& in_quote,
86  size_t offset) {
87  size_t last_line_delim_pos = 0;
88  const char* current = buffer + offset;
89  if (copy_params.quoted) {
90  while (current < buffer + size) {
91  while (!in_quote && current < buffer + size) {
92  // We are outside of quotes. We have to find the last possible line delimiter.
93  if (*current == copy_params.line_delim) {
94  last_line_delim_pos = current - buffer;
95  ++num_rows_this_buffer;
96  } else if (*current == copy_params.quote) {
97  in_quote = true;
98  }
99  ++current;
100  }
101 
102  while (in_quote && current < buffer + size) {
103  // We are in a quoted field. We have to find the ending quote.
104  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
105  (*(current + 1) == copy_params.quote)) {
106  ++current;
107  } else if (*current == copy_params.quote) {
108  in_quote = false;
109  }
110  ++current;
111  }
112  }
113  } else {
114  while (current < buffer + size) {
115  if (*current == copy_params.line_delim) {
116  last_line_delim_pos = current - buffer;
117  ++num_rows_this_buffer;
118  }
119  ++current;
120  }
121  }
122 
123  if (last_line_delim_pos <= 0) {
124  size_t excerpt_length = std::min<size_t>(50, size);
125  std::string buffer_excerpt{buffer, buffer + excerpt_length};
126  std::string error_message =
127  "Unable to find an end of line character after reading " + std::to_string(size) +
128  " characters. Please ensure that the correct \"line_delimiter\" option is "
129  "specified or update the \"buffer_size\" option appropriately. Row number: " +
130  std::to_string(buffer_first_row_index + 1) +
131  ". First few characters in row: " + buffer_excerpt;
132  throw InsufficientBufferSizeException{error_message};
133  }
134 
135  return last_line_delim_pos + 1;
136 }
137 
139 
141  return max_buffer_resize;
142 }
143 
144 void set_max_buffer_resize(const size_t max_buffer_resize_param) {
145  max_buffer_resize = max_buffer_resize_param;
146 }
147 
148 size_t find_row_end_pos(size_t& alloc_size,
149  std::unique_ptr<char[]>& buffer,
150  size_t& buffer_size,
151  const CopyParams& copy_params,
152  const size_t buffer_first_row_index,
153  unsigned int& num_rows_in_buffer,
154  FILE* file,
155  foreign_storage::FileReader* file_reader) {
156  bool found_end_pos{false};
157  bool in_quote{false};
158  size_t offset{0};
159  size_t end_pos;
160  CHECK(file != nullptr || file_reader != nullptr);
162  while (!found_end_pos) {
163  try {
164  end_pos = delimited_parser::find_end(buffer.get(),
165  buffer_size,
166  copy_params,
167  num_rows_in_buffer,
168  buffer_first_row_index,
169  in_quote,
170  offset);
171  found_end_pos = true;
172  } catch (InsufficientBufferSizeException& e) {
173  if (alloc_size >= max_buffer_resize) {
174  throw;
175  }
176  if (file == nullptr && file_reader->isScanFinished()) {
177  throw;
178  }
179  offset = buffer_size;
181  buffer, buffer_size, alloc_size, file, file_reader, max_buffer_resize);
182  }
183  }
184  return end_pos;
185 }
186 
187 template <typename T>
188 const char* get_row(const char* buf,
189  const char* buf_end,
190  const char* entire_buf_end,
191  const import_export::CopyParams& copy_params,
192  const bool* is_array,
193  std::vector<T>& row,
194  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
195  bool& try_single_thread,
196  bool filter_empty_lines) {
197  const char* field = buf;
198  const char* p;
199  bool in_quote = false;
200  bool in_array = false;
201  bool has_escape = false;
202  bool strip_quotes = false;
203  try_single_thread = false;
204  for (p = buf; p < entire_buf_end; ++p) {
205  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
206  *(p + 1) == copy_params.quote) {
207  p++;
208  has_escape = true;
209  } else if (copy_params.quoted && *p == copy_params.quote) {
210  in_quote = !in_quote;
211  if (in_quote) {
212  strip_quotes = true;
213  }
214  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
215  is_array[row.size()]) {
216  in_array = true;
217  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
218  ++p;
219  if (*p == copy_params.array_end) {
220  in_array = false;
221  break;
222  }
223  }
224  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
225  if (!in_quote) {
226  if (!has_escape && !strip_quotes) {
227  const char* field_end = p;
228  if (copy_params.trim_spaces) {
229  trim_space(field, field_end);
230  }
231  row.emplace_back(field, field_end - field);
232  } else {
233  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
234  auto field_buf = tmp_buffers.back().get();
235  int j = 0, i = 0;
236  for (; i < p - field; i++, j++) {
237  if (has_escape && field[i] == copy_params.escape &&
238  field[i + 1] == copy_params.quote) {
239  field_buf[j] = copy_params.quote;
240  i++;
241  } else {
242  field_buf[j] = field[i];
243  }
244  }
245  const char* field_begin = field_buf;
246  const char* field_end = field_buf + j;
247  if (copy_params.trim_spaces) {
248  trim_space(field_begin, field_end);
249  }
250  trim_quotes(field_begin, field_end, copy_params);
251  row.emplace_back(field_begin, field_end - field_begin);
252  }
253  field = p + 1;
254  has_escape = false;
255  strip_quotes = false;
256 
257  if (is_eol(*p, copy_params)) {
258  // We are at the end of the row. Skip the line endings now.
259  if (filter_empty_lines) {
260  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
261  p++;
262  }
263  } else {
264  // skip DOS carriage return line feed only
265  if (p + 1 < buf_end && *p == '\r' && *(p + 1) == '\n') {
266  p++;
267  }
268  }
269  break;
270  }
271  }
272  }
273  }
274  /*
275  @TODO(wei) do error handling
276  */
277  if (in_quote) {
278  LOG(ERROR) << "Unmatched quote.";
279  try_single_thread = true;
280  }
281  if (in_array) {
282  LOG(ERROR) << "Unmatched array.";
283  try_single_thread = true;
284  }
285  return p;
286 }
287 
288 template const char* get_row(const char* buf,
289  const char* buf_end,
290  const char* entire_buf_end,
291  const import_export::CopyParams& copy_params,
292  const bool* is_array,
293  std::vector<std::string>& row,
294  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
295  bool& try_single_thread,
296  bool filter_empty_lines);
297 
298 template const char* get_row(const char* buf,
299  const char* buf_end,
300  const char* entire_buf_end,
301  const import_export::CopyParams& copy_params,
302  const bool* is_array,
303  std::vector<std::string_view>& row,
304  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
305  bool& try_single_thread,
306  bool filter_empty_lines);
307 
308 void parse_string_array(const std::string& s,
309  const import_export::CopyParams& copy_params,
310  std::vector<std::string>& string_vec,
311  bool truncate_values) {
312  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
313  return;
314  }
315  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
316  throw std::runtime_error("Malformed Array :" + s);
317  }
318 
319  std::string row(s.c_str() + 1, s.length() - 2);
320  if (row.empty()) { // allow empty arrays
321  return;
322  }
323  row.push_back('\n');
324 
325  bool try_single_thread = false;
326  import_export::CopyParams array_params = copy_params;
327  array_params.delimiter = copy_params.array_delim;
328  std::vector<std::unique_ptr<char[]>> tmp_buffers;
329  get_row(row.c_str(),
330  row.c_str() + row.length(),
331  row.c_str() + row.length(),
332  array_params,
333  nullptr,
334  string_vec,
335  tmp_buffers,
336  try_single_thread,
337  true);
338 
339  for (size_t i = 0; i < string_vec.size(); ++i) {
340  if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
341  if (truncate_values) {
342  string_vec[i] = string_vec[i].substr(0, StringDictionary::MAX_STRLEN);
343  } else {
344  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
346  }
347  }
348  }
349 
350  // use empty string to mark nulls
351  for (auto& value : string_vec) {
352  if (value == copy_params.null_str || value == "NULL" || value.empty()) {
353  value.clear();
354  }
355  }
356 }
357 
358 void extend_buffer(std::unique_ptr<char[]>& buffer,
359  size_t& buffer_size,
360  size_t& alloc_size,
361  FILE* file,
362  foreign_storage::FileReader* file_reader,
363  size_t max_buffer_resize) {
364  auto old_buffer = std::move(buffer);
365  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
366  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
367  << " bytes";
368  buffer = std::make_unique<char[]>(alloc_size);
369 
370  memcpy(buffer.get(), old_buffer.get(), buffer_size);
371  size_t fread_size;
372  CHECK(file != nullptr || file_reader != nullptr);
373  if (file != nullptr) {
374  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
375  } else {
376  fread_size = file_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
377  }
378  buffer_size += fread_size;
379 }
380 } // namespace delimited_parser
381 } // namespace import_export
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
virtual bool isScanFinished() const =0
virtual size_t read(void *buffer, size_t max_size)=0
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:285
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
std::string to_string(char const *&&v)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
void trim_space(const char *&field_begin, const char *&field_end)
#define CHECK(condition)
Definition: Logger.h:291
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr size_t MAX_STRLEN
static constexpr size_t max_import_buffer_resize_byte_size
Definition: CopyParams.h:37
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:246