OmniSciDB  dfae7c3b14
DelimitedParserUtils.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file DelimitedParserUtils.cpp
19  * @author Mehmet Sariyuce <mehmet.sariyuce@omnisci.com>
20  * @brief Implementation of delimited parser utils.
21  */
22 
24 
25 #include <string_view>
26 
27 #include "Logger/Logger.h"
29 
30 namespace {
31 inline bool is_eol(const char& c, const import_export::CopyParams& copy_params) {
32  return c == copy_params.line_delim || c == '\n' || c == '\r';
33 }
34 
35 inline void trim_space(const char*& field_begin, const char*& field_end) {
36  while (field_begin < field_end && (*field_begin == ' ' || *field_begin == '\r')) {
37  ++field_begin;
38  }
39  while (field_begin < field_end &&
40  (*(field_end - 1) == ' ' || *(field_end - 1) == '\r')) {
41  --field_end;
42  }
43 }
44 
45 inline void trim_quotes(const char*& field_begin,
46  const char*& field_end,
47  const import_export::CopyParams& copy_params) {
48  if (copy_params.quoted && field_end - field_begin > 0 &&
49  *field_begin == copy_params.quote) {
50  ++field_begin;
51  }
52  if (copy_params.quoted && field_end - field_begin > 0 &&
53  *(field_end - 1) == copy_params.quote) {
54  --field_end;
55  }
56 }
57 } // namespace
58 
59 namespace import_export {
60 namespace delimited_parser {
61 size_t find_beginning(const char* buffer,
62  size_t begin,
63  size_t end,
64  const import_export::CopyParams& copy_params) {
65  // @TODO(wei) line_delim is in quotes note supported
66  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
67  return 0;
68  }
69  size_t i;
70  const char* buf = buffer + begin;
71  for (i = 0; i < end - begin; i++) {
72  if (buf[i] == copy_params.line_delim) {
73  return i + 1;
74  }
75  }
76  return i;
77 }
78 
79 size_t find_end(const char* buffer,
80  size_t size,
81  const import_export::CopyParams& copy_params,
82  unsigned int& num_rows_this_buffer,
83  size_t buffer_first_row_index,
84  bool& in_quote,
85  size_t offset) {
86  size_t last_line_delim_pos = 0;
87  const char* current = buffer + offset;
88  if (copy_params.quoted) {
89  while (current < buffer + size) {
90  while (!in_quote && current < buffer + size) {
91  // We are outside of quotes. We have to find the last possible line delimiter.
92  if (*current == copy_params.line_delim) {
93  last_line_delim_pos = current - buffer;
94  ++num_rows_this_buffer;
95  } else if (*current == copy_params.quote) {
96  in_quote = true;
97  }
98  ++current;
99  }
100 
101  while (in_quote && current < buffer + size) {
102  // We are in a quoted field. We have to find the ending quote.
103  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
104  (*(current + 1) == copy_params.quote)) {
105  ++current;
106  } else if (*current == copy_params.quote) {
107  in_quote = false;
108  }
109  ++current;
110  }
111  }
112  } else {
113  while (current < buffer + size) {
114  if (*current == copy_params.line_delim) {
115  last_line_delim_pos = current - buffer;
116  ++num_rows_this_buffer;
117  }
118  ++current;
119  }
120  }
121 
122  if (last_line_delim_pos <= 0) {
123  size_t excerpt_length = std::min<size_t>(50, size);
124  std::string buffer_excerpt{buffer, buffer + excerpt_length};
125  std::string error_message =
126  "Unable to find an end of line character after reading " + std::to_string(size) +
127  " characters. Please ensure that the correct \"line_delimiter\" option is "
128  "specified or update the \"buffer_size\" option appropriately. Row number: " +
129  std::to_string(buffer_first_row_index + 1) +
130  ". First few characters in row: " + buffer_excerpt;
131  throw InsufficientBufferSizeException{error_message};
132  }
133 
134  return last_line_delim_pos + 1;
135 }
136 
137 static size_t max_buffer_resize = 1024 * 1024 * 1024;
138 
140  return max_buffer_resize;
141 }
142 
143 void set_max_buffer_resize(const size_t max_buffer_resize_param) {
144  max_buffer_resize = max_buffer_resize_param;
145 }
146 
147 size_t find_row_end_pos(size_t& alloc_size,
148  std::unique_ptr<char[]>& buffer,
149  size_t& buffer_size,
150  const CopyParams& copy_params,
151  const size_t buffer_first_row_index,
152  unsigned int& num_rows_in_buffer,
153  FILE* file,
154  foreign_storage::CsvReader* csv_reader) {
155  bool found_end_pos{false};
156  bool in_quote{false};
157  size_t offset{0};
158  size_t end_pos;
159  CHECK(file != nullptr || csv_reader != nullptr);
160  const auto max_buffer_resize = get_max_buffer_resize();
161  while (!found_end_pos) {
162  try {
163  end_pos = delimited_parser::find_end(buffer.get(),
164  buffer_size,
165  copy_params,
166  num_rows_in_buffer,
167  buffer_first_row_index,
168  in_quote,
169  offset);
170  found_end_pos = true;
171  } catch (InsufficientBufferSizeException& e) {
172  if (alloc_size >= max_buffer_resize) {
173  throw;
174  }
175  if (file == nullptr && csv_reader->isScanFinished()) {
176  throw;
177  }
178  auto old_buffer = std::move(buffer);
179  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
180  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
181  << " bytes";
182  buffer = std::make_unique<char[]>(alloc_size);
183 
184  memcpy(buffer.get(), old_buffer.get(), buffer_size);
185  size_t fread_size;
186  if (file != nullptr) {
187  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
188  } else {
189  fread_size =
190  csv_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
191  }
192  offset = buffer_size;
193  buffer_size += fread_size;
194  }
195  }
196  return end_pos;
197 }
198 
199 template <typename T>
200 const char* get_row(const char* buf,
201  const char* buf_end,
202  const char* entire_buf_end,
203  const import_export::CopyParams& copy_params,
204  const bool* is_array,
205  std::vector<T>& row,
206  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
207  bool& try_single_thread) {
208  const char* field = buf;
209  const char* p;
210  bool in_quote = false;
211  bool in_array = false;
212  bool has_escape = false;
213  bool strip_quotes = false;
214  try_single_thread = false;
215  for (p = buf; p < entire_buf_end; ++p) {
216  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
217  *(p + 1) == copy_params.quote) {
218  p++;
219  has_escape = true;
220  } else if (copy_params.quoted && *p == copy_params.quote) {
221  in_quote = !in_quote;
222  if (in_quote) {
223  strip_quotes = true;
224  }
225  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
226  is_array[row.size()]) {
227  in_array = true;
228  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
229  ++p;
230  if (*p == copy_params.array_end) {
231  in_array = false;
232  break;
233  }
234  }
235  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
236  if (!in_quote) {
237  if (!has_escape && !strip_quotes) {
238  const char* field_end = p;
239  trim_space(field, field_end);
240  row.emplace_back(field, field_end - field);
241  } else {
242  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
243  auto field_buf = tmp_buffers.back().get();
244  int j = 0, i = 0;
245  for (; i < p - field; i++, j++) {
246  if (has_escape && field[i] == copy_params.escape &&
247  field[i + 1] == copy_params.quote) {
248  field_buf[j] = copy_params.quote;
249  i++;
250  } else {
251  field_buf[j] = field[i];
252  }
253  }
254  const char* field_begin = field_buf;
255  const char* field_end = field_buf + j;
256  trim_space(field_begin, field_end);
257  trim_quotes(field_begin, field_end, copy_params);
258  row.emplace_back(field_begin, field_end - field_begin);
259  }
260  field = p + 1;
261  has_escape = false;
262  strip_quotes = false;
263 
264  if (is_eol(*p, copy_params)) {
265  // We are at the end of the row. Skip the line endings now.
266  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
267  p++;
268  }
269  break;
270  }
271  }
272  }
273  }
274  /*
275  @TODO(wei) do error handling
276  */
277  if (in_quote) {
278  LOG(ERROR) << "Unmatched quote.";
279  try_single_thread = true;
280  }
281  if (in_array) {
282  LOG(ERROR) << "Unmatched array.";
283  try_single_thread = true;
284  }
285  return p;
286 }
287 
288 template const char* get_row(const char* buf,
289  const char* buf_end,
290  const char* entire_buf_end,
291  const import_export::CopyParams& copy_params,
292  const bool* is_array,
293  std::vector<std::string>& row,
294  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
295  bool& try_single_thread);
296 
297 template const char* get_row(const char* buf,
298  const char* buf_end,
299  const char* entire_buf_end,
300  const import_export::CopyParams& copy_params,
301  const bool* is_array,
302  std::vector<std::string_view>& row,
303  std::vector<std::unique_ptr<char[]>>& tmp_buffers,
304  bool& try_single_thread);
305 
306 void parse_string_array(const std::string& s,
307  const import_export::CopyParams& copy_params,
308  std::vector<std::string>& string_vec) {
309  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
310  // TODO: should not convert NULL, empty arrays to {"NULL"},
311  // need to support NULL, empty properly
312  string_vec.emplace_back("NULL");
313  return;
314  }
315  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
316  throw std::runtime_error("Malformed Array :" + s);
317  }
318 
319  std::string row(s.c_str() + 1, s.length() - 2);
320  row.push_back('\n');
321  bool try_single_thread = false;
322  import_export::CopyParams array_params = copy_params;
323  array_params.delimiter = copy_params.array_delim;
324  std::vector<std::unique_ptr<char[]>> tmp_buffers;
325  get_row(row.c_str(),
326  row.c_str() + row.length(),
327  row.c_str() + row.length(),
328  array_params,
329  nullptr,
330  string_vec,
331  tmp_buffers,
332  try_single_thread);
333 
334  for (size_t i = 0; i < string_vec.size(); ++i) {
335  if (string_vec[i].empty()) { // Disallow empty strings for now
336  string_vec.erase(string_vec.begin() + i);
337  --i;
338  } else if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
339  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
341  }
342  }
343 }
344 
345 } // namespace delimited_parser
346 } // namespace import_export
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:188
virtual size_t read(void *buffer, size_t max_size)=0
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
std::string to_string(char const *&&v)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void trim_space(const char *&field_begin, const char *&field_end)
virtual bool isScanFinished()=0
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:220
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.