OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DelimitedParserUtils.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file DelimitedParserUtils.cpp
19  * @author Mehmet Sariyuce <mehmet.sariyuce@omnisci.com>
20  * @brief Implementation of delimited parser utils.
21  */
22 
24 
25 #include <string_view>
26 
28 #include "Logger/Logger.h"
30 
31 namespace {
32 inline bool is_eol(const char& c, const import_export::CopyParams& copy_params) {
33  return c == copy_params.line_delim || c == '\n' || c == '\r';
34 }
35 
36 inline void trim_space(const char*& field_begin, const char*& field_end) {
37  while (field_begin < field_end && (*field_begin == ' ' || *field_begin == '\r')) {
38  ++field_begin;
39  }
40  while (field_begin < field_end &&
41  (*(field_end - 1) == ' ' || *(field_end - 1) == '\r')) {
42  --field_end;
43  }
44 }
45 
46 inline void trim_quotes(const char*& field_begin,
47  const char*& field_end,
48  const import_export::CopyParams& copy_params) {
49  if (copy_params.quoted && field_end - field_begin > 0 &&
50  *field_begin == copy_params.quote) {
51  ++field_begin;
52  }
53  if (copy_params.quoted && field_end - field_begin > 0 &&
54  *(field_end - 1) == copy_params.quote) {
55  --field_end;
56  }
57 }
58 } // namespace
59 
60 namespace import_export {
61 namespace delimited_parser {
62 size_t find_beginning(const char* buffer,
63  size_t begin,
64  size_t end,
65  const import_export::CopyParams& copy_params) {
66  // @TODO(wei) line_delim is in quotes note supported
67  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
68  return 0;
69  }
70  size_t i;
71  const char* buf = buffer + begin;
72  for (i = 0; i < end - begin; i++) {
73  if (buf[i] == copy_params.line_delim) {
74  return i + 1;
75  }
76  }
77  return i;
78 }
79 
80 size_t find_end(const char* buffer,
81  size_t size,
82  const import_export::CopyParams& copy_params,
83  unsigned int& num_rows_this_buffer,
84  size_t buffer_first_row_index,
85  bool& in_quote,
86  size_t offset) {
87  size_t last_line_delim_pos = 0;
88  const char* current = buffer + offset;
89  if (copy_params.quoted) {
90  while (current < buffer + size) {
91  while (!in_quote && current < buffer + size) {
92  // We are outside of quotes. We have to find the last possible line delimiter.
93  if (*current == copy_params.line_delim) {
94  last_line_delim_pos = current - buffer;
95  ++num_rows_this_buffer;
96  } else if (*current == copy_params.quote) {
97  in_quote = true;
98  }
99  ++current;
100  }
101 
102  while (in_quote && current < buffer + size) {
103  // We are in a quoted field. We have to find the ending quote.
104  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
105  (*(current + 1) == copy_params.quote)) {
106  ++current;
107  } else if (*current == copy_params.quote) {
108  in_quote = false;
109  }
110  ++current;
111  }
112  }
113  } else {
114  while (current < buffer + size) {
115  if (*current == copy_params.line_delim) {
116  last_line_delim_pos = current - buffer;
117  ++num_rows_this_buffer;
118  }
119  ++current;
120  }
121  }
122 
123  if (last_line_delim_pos <= 0) {
124  size_t excerpt_length = std::min<size_t>(50, size);
125  std::string buffer_excerpt{buffer, buffer + excerpt_length};
126  std::string error_message =
127  "Unable to find an end of line character after reading " + std::to_string(size) +
128  " characters. Please ensure that the correct \"line_delimiter\" option is "
129  "specified or update the \"buffer_size\" option appropriately. Row number: " +
130  std::to_string(buffer_first_row_index + 1) +
131  ". First few characters in row: " + buffer_excerpt;
132  throw InsufficientBufferSizeException{error_message};
133  }
134 
135  return last_line_delim_pos + 1;
136 }
137 
139 
141  return max_buffer_resize;
142 }
143 
144 void set_max_buffer_resize(const size_t max_buffer_resize_param) {
145  max_buffer_resize = max_buffer_resize_param;
146 }
147 
148 size_t find_row_end_pos(size_t& alloc_size,
149  std::unique_ptr<char[]>& buffer,
150  size_t& buffer_size,
151  const CopyParams& copy_params,
152  const size_t buffer_first_row_index,
153  unsigned int& num_rows_in_buffer,
154  FILE* file,
155  foreign_storage::FileReader* file_reader) {
156  bool found_end_pos{false};
157  bool in_quote{false};
158  size_t offset{0};
159  size_t end_pos;
160  CHECK(file != nullptr || file_reader != nullptr);
162  while (!found_end_pos) {
163  try {
164  end_pos = delimited_parser::find_end(buffer.get(),
165  buffer_size,
166  copy_params,
167  num_rows_in_buffer,
168  buffer_first_row_index,
169  in_quote,
170  offset);
171  found_end_pos = true;
172  } catch (InsufficientBufferSizeException& e) {
173  if (alloc_size >= max_buffer_resize) {
174  throw;
175  }
176  if (file == nullptr && file_reader->isScanFinished()) {
177  throw;
178  }
179  offset = buffer_size;
181  buffer, buffer_size, alloc_size, file, file_reader, max_buffer_resize);
182  }
183  }
184  return end_pos;
185 }
186 
187 template <typename T>
188 const char* get_row(const char* buf,
189  const char* buf_end,
190  const char* entire_buf_end,
191  const import_export::CopyParams& copy_params,
192  const bool* is_array,
193  std::vector<T>& row,
194  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
195  bool& try_single_thread,
196  bool filter_empty_lines) {
197  const char* field = buf;
198  const char* p;
199  bool in_quote = false;
200  bool in_array = false;
201  bool has_escape = false;
202  bool strip_quotes = false;
203  try_single_thread = false;
204  for (p = buf; p < entire_buf_end; ++p) {
205  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
206  *(p + 1) == copy_params.quote) {
207  p++;
208  has_escape = true;
209  } else if (copy_params.quoted && *p == copy_params.quote) {
210  in_quote = !in_quote;
211  if (in_quote) {
212  strip_quotes = true;
213  }
214  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
215  is_array[row.size()]) {
216  in_array = true;
217  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
218  ++p;
219  if (*p == copy_params.array_end) {
220  in_array = false;
221  break;
222  }
223  }
224  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
225  if (!in_quote) {
226  if (!has_escape && !strip_quotes) {
227  const char* field_end = p;
228  trim_space(field, field_end);
229  row.emplace_back(field, field_end - field);
230  } else {
231  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
232  auto field_buf = tmp_buffers.back().get();
233  int j = 0, i = 0;
234  for (; i < p - field; i++, j++) {
235  if (has_escape && field[i] == copy_params.escape &&
236  field[i + 1] == copy_params.quote) {
237  field_buf[j] = copy_params.quote;
238  i++;
239  } else {
240  field_buf[j] = field[i];
241  }
242  }
243  const char* field_begin = field_buf;
244  const char* field_end = field_buf + j;
245  trim_space(field_begin, field_end);
246  trim_quotes(field_begin, field_end, copy_params);
247  row.emplace_back(field_begin, field_end - field_begin);
248  }
249  field = p + 1;
250  has_escape = false;
251  strip_quotes = false;
252 
253  if (is_eol(*p, copy_params)) {
254  // We are at the end of the row. Skip the line endings now.
255  if (filter_empty_lines) {
256  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
257  p++;
258  }
259  } else {
260  // skip DOS carriage return line feed only
261  if (p + 1 < buf_end && *p == '\r' && *(p + 1) == '\n') {
262  p++;
263  }
264  }
265  break;
266  }
267  }
268  }
269  }
270  /*
271  @TODO(wei) do error handling
272  */
273  if (in_quote) {
274  LOG(ERROR) << "Unmatched quote.";
275  try_single_thread = true;
276  }
277  if (in_array) {
278  LOG(ERROR) << "Unmatched array.";
279  try_single_thread = true;
280  }
281  return p;
282 }
283 
284 template const char* get_row(const char* buf,
285  const char* buf_end,
286  const char* entire_buf_end,
287  const import_export::CopyParams& copy_params,
288  const bool* is_array,
289  std::vector<std::string>& row,
290  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
291  bool& try_single_thread,
292  bool filter_empty_lines);
293 
294 template const char* get_row(const char* buf,
295  const char* buf_end,
296  const char* entire_buf_end,
297  const import_export::CopyParams& copy_params,
298  const bool* is_array,
299  std::vector<std::string_view>& row,
300  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
301  bool& try_single_thread,
302  bool filter_empty_lines);
303 
304 void parse_string_array(const std::string& s,
305  const import_export::CopyParams& copy_params,
306  std::vector<std::string>& string_vec) {
307  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
308  return;
309  }
310  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
311  throw std::runtime_error("Malformed Array :" + s);
312  }
313 
314  std::string row(s.c_str() + 1, s.length() - 2);
315  if (row.empty()) { // allow empty arrays
316  return;
317  }
318  row.push_back('\n');
319 
320  bool try_single_thread = false;
321  import_export::CopyParams array_params = copy_params;
322  array_params.delimiter = copy_params.array_delim;
323  std::vector<std::unique_ptr<char[]>> tmp_buffers;
324  get_row(row.c_str(),
325  row.c_str() + row.length(),
326  row.c_str() + row.length(),
327  array_params,
328  nullptr,
329  string_vec,
330  tmp_buffers,
331  try_single_thread,
332  true);
333 
334  for (size_t i = 0; i < string_vec.size(); ++i) {
335  if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
336  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
338  }
339  }
340 
341  // use empty string to mark nulls
342  for (auto& value : string_vec) {
343  if (value == copy_params.null_str || value == "NULL" || value.empty()) {
344  value.clear();
345  }
346  }
347 }
348 
349 void extend_buffer(std::unique_ptr<char[]>& buffer,
350  size_t& buffer_size,
351  size_t& alloc_size,
352  FILE* file,
353  foreign_storage::FileReader* file_reader,
354  size_t max_buffer_resize) {
355  auto old_buffer = std::move(buffer);
356  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
357  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
358  << " bytes";
359  buffer = std::make_unique<char[]>(alloc_size);
360 
361  memcpy(buffer.get(), old_buffer.get(), buffer_size);
362  size_t fread_size;
363  CHECK(file != nullptr || file_reader != nullptr);
364  if (file != nullptr) {
365  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
366  } else {
367  fread_size = file_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
368  }
369  buffer_size += fread_size;
370 }
371 } // namespace delimited_parser
372 } // namespace import_export
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
virtual size_t read(void *buffer, size_t max_size)=0
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:203
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
std::string to_string(char const *&&v)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
virtual bool isScanFinished()=0
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
void trim_space(const char *&field_begin, const char *&field_end)
#define CHECK(condition)
Definition: Logger.h:209
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr size_t MAX_STRLEN
static constexpr size_t max_import_buffer_resize_byte_size
Definition: CopyParams.h:35
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:241
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.