OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::delimited_parser Namespace Reference

Classes

class  InsufficientBufferSizeException
 
class  DelimitedParserException
 

Functions

size_t find_beginning (const char *buffer, size_t begin, size_t end, const CopyParams &copy_params)
 Finds the closest possible row beginning in the given buffer. More...
 
size_t find_end (const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
 
size_t get_max_buffer_resize ()
 Gets the maximum size to which thread buffers should be automatically resized. More...
 
void set_max_buffer_resize (const size_t max_buffer_resize)
 Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing. More...
 
size_t find_row_end_pos (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader=nullptr)
 Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached. More...
 
template<typename T >
const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 Parses the first row in the given buffer and inserts fields into given vector. More...
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string_view > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
void parse_string_array (const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values=false)
 Parses given string array and inserts into given vector of strings. More...
 
void extend_buffer (std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
 

Variables

static size_t max_buffer_resize = max_import_buffer_resize_byte_size
 

Function Documentation

void import_export::delimited_parser::extend_buffer ( std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
size_t &  alloc_size,
FILE *  file,
foreign_storage::FileReader file_reader,
size_t  max_buffer_resize 
)

Extends the given buffer to the lesser of max_buffer_resize or twice the given allocation size and reads new content from the file into the newly allocated buffer.

Parameters
buffer- buffer that will be extended
buffer_size- current buffer size
alloc_size- current allocation size
file- handle for file to be read from (one of file or file_reader must be present)
file_reader- reader for file to be read from (one of file or file_reader must be present)
max_buffer_resize- maximum size that the buffer can be extended to

Definition at line 376 of file DelimitedParserUtils.cpp.

References CHECK, logger::INFO, LOG, and foreign_storage::FileReader::read().

Referenced by find_row_end_pos(), and foreign_storage::RegexFileBufferParser::findRowEndPosition().

381  {
382  auto old_buffer = std::move(buffer);
383  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
384  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
385  << " bytes";
386  buffer = std::make_unique<char[]>(alloc_size);
387 
388  memcpy(buffer.get(), old_buffer.get(), buffer_size);
389  size_t fread_size;
390  CHECK(file != nullptr || file_reader != nullptr);
391  if (file != nullptr) {
392  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
393  } else {
394  fread_size = file_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
395  }
396  buffer_size += fread_size;
397 }
virtual size_t read(void *buffer, size_t max_size)=0
#define LOG(tag)
Definition: Logger.h:285
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_beginning ( const char *  buffer,
size_t  begin,
size_t  end,
const CopyParams &  copy_params 
)

Finds the closest possible row beginning in the given buffer.

Parameters
bufferGiven buffer which has the rows in csv format. (NOT OWN)
beginStart index of buffer to look for the beginning.
endEnd index of buffer to look for the beginning.
copy_paramsCopy params for the table.
Returns
The position of the closest possible row beginning to the start of the given buffer.

Definition at line 67 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::line_delim.

Referenced by import_export::import_thread_delimited(), and foreign_storage::CsvFileBufferParser::parseBuffer().

70  {
71  // @TODO(wei) line_delim is in quotes note supported
72  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
73  return 0;
74  }
75  size_t i;
76  const char* buf = buffer + begin;
77  for (i = 0; i < end - begin; i++) {
78  if (buf[i] == copy_params.line_delim) {
79  return i + 1;
80  }
81  }
82  return i;
83 }

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_end ( const char *  buffer,
size_t  size,
const import_export::CopyParams copy_params,
unsigned int &  num_rows_this_buffer,
size_t  buffer_first_row_index,
bool &  in_quote,
size_t  offset 
)

Definition at line 85 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::escape, import_export::CopyParams::line_delim, import_export::CopyParams::quote, import_export::CopyParams::quoted, and to_string().

Referenced by find_row_end_pos().

91  {
92  size_t last_line_delim_pos = 0;
93  const char* current = buffer + offset;
94  if (copy_params.quoted) {
95  while (current < buffer + size) {
96  while (!in_quote && current < buffer + size) {
97  // We are outside of quotes. We have to find the last possible line delimiter.
98  if (*current == copy_params.line_delim) {
99  last_line_delim_pos = current - buffer;
100  ++num_rows_this_buffer;
101  } else if (*current == copy_params.quote) {
102  in_quote = true;
103  }
104  ++current;
105  }
106 
107  while (in_quote && current < buffer + size) {
108  // We are in a quoted field. We have to find the ending quote.
109  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
110  (*(current + 1) == copy_params.quote)) {
111  ++current;
112  } else if (*current == copy_params.quote) {
113  in_quote = false;
114  }
115  ++current;
116  }
117  }
118  } else {
119  while (current < buffer + size) {
120  if (*current == copy_params.line_delim) {
121  last_line_delim_pos = current - buffer;
122  ++num_rows_this_buffer;
123  }
124  ++current;
125  }
126  }
127 
128  if (last_line_delim_pos <= 0) {
129  size_t excerpt_length = std::min<size_t>(50, size);
130  std::string buffer_excerpt{buffer, buffer + excerpt_length};
131  if (in_quote) {
132  std::string quote(1, copy_params.quote);
133  std::string error_message =
134  "Unable to find a matching end quote for the quote character '" + quote +
135  "' after reading " + std::to_string(size) +
136  " characters. Please ensure that all data fields are correctly formatted "
137  "or update the \"buffer_size\" option appropriately. Row number: " +
138  std::to_string(buffer_first_row_index + 1) +
139  ". First few characters in row: " + buffer_excerpt;
140  throw InsufficientBufferSizeException{error_message};
141  } else {
142  std::string error_message =
143  "Unable to find an end of line character after reading " +
144  std::to_string(size) +
145  " characters. Please ensure that the correct \"line_delimiter\" option is "
146  "specified or update the \"buffer_size\" option appropriately. Row number: " +
147  std::to_string(buffer_first_row_index + 1) +
148  ". First few characters in row: " + buffer_excerpt;
149  throw InsufficientBufferSizeException{error_message};
150  }
151  }
152 
153  return last_line_delim_pos + 1;
154 }
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_row_end_pos ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const CopyParams &  copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FILE *  file,
foreign_storage::FileReader file_reader = nullptr 
)

Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached.

Parameters
alloc_sizeAllocation size of subsequent buffer. This is adjusted as needed, if the buffer has to be resized.
bufferGiven buffer which has the rows in csv format.
buffer_sizeSize of the buffer.
copy_paramsCopy params for the table.
buffer_first_row_indexIndex of first row in the buffer.
num_rows_in_bufferNumber of rows until the closest possible row ending.
fileHandle to CSV file being parsed. (optional)
file_readerHandle to a FileReader class, must be valid if file isnt
Returns
The position of the closest possible row ending to the end of the given buffer.

Definition at line 166 of file DelimitedParserUtils.cpp.

References CHECK, extend_buffer(), find_end(), get_max_buffer_resize(), foreign_storage::FileReader::isScanFinished(), and max_buffer_resize.

Referenced by foreign_storage::CsvFileBufferParser::findRowEndPosition(), and import_export::Importer::importDelimited().

173  {
174  bool found_end_pos{false};
175  bool in_quote{false};
176  size_t offset{0};
177  size_t end_pos;
178  CHECK(file != nullptr || file_reader != nullptr);
180  while (!found_end_pos) {
181  try {
182  end_pos = delimited_parser::find_end(buffer.get(),
183  buffer_size,
184  copy_params,
185  num_rows_in_buffer,
186  buffer_first_row_index,
187  in_quote,
188  offset);
189  found_end_pos = true;
190  } catch (InsufficientBufferSizeException& e) {
191  if (alloc_size >= max_buffer_resize) {
192  throw;
193  }
194  if (file == nullptr && file_reader->isScanFinished()) {
195  throw;
196  }
197  offset = buffer_size;
199  buffer, buffer_size, alloc_size, file, file_reader, max_buffer_resize);
200  }
201  }
202  return end_pos;
203 }
virtual bool isScanFinished() const =0
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::get_max_buffer_resize ( )

Gets the maximum size to which thread buffers should be automatically resized.

Definition at line 158 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

Referenced by find_row_end_pos().

158  {
159  return max_buffer_resize;
160 }

+ Here is the caller graph for this function:

template<typename T >
const char * import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< T > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)

Parses the first row in the given buffer and inserts fields into given vector.

Parameters
bufGiven buffer which has the rows in csv format. (NOT OWN)
buf_endEnd of the sliced buffer for the thread. (NOT OWN)
entire_buf_endEnd of the entire buffer. (NOT OWN)
copy_paramsCopy params for the table.
is_arrayArray of bools which tells if a column is an array type.
rowGiven vector to be populated with parsed fields.
try_single_threadIn case of parse errors, this will tell if parsing should continue with single thread.
filter_empty_linesWhether to skip empty lines (used when parsing single columns returned by s3 select, as nulls may be encoded as empty lines)
Returns
Pointer to the next row after the first row is parsed.

Definition at line 206 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, logger::ERROR, import_export::CopyParams::escape, field(), anonymous_namespace{DelimitedParserUtils.cpp}::is_eol(), LOG, import_export::CopyParams::quote, import_export::CopyParams::quoted, anonymous_namespace{DelimitedParserUtils.cpp}::trim_quotes(), import_export::trim_space(), and import_export::CopyParams::trim_spaces.

Referenced by import_export::import_thread_delimited(), parse_string_array(), foreign_storage::CsvFileBufferParser::parseBuffer(), import_export::Detector::split_raw_data(), and foreign_storage::CsvFileBufferParser::validateExpectedColumnCount().

214  {
215  const char* field = buf;
216  const char* p;
217  bool in_quote = false;
218  bool in_array = false;
219  bool has_escape = false;
220  bool strip_quotes = false;
221  try_single_thread = false;
222  for (p = buf; p < entire_buf_end; ++p) {
223  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
224  *(p + 1) == copy_params.quote) {
225  p++;
226  has_escape = true;
227  } else if (copy_params.quoted && *p == copy_params.quote) {
228  in_quote = !in_quote;
229  if (in_quote) {
230  strip_quotes = true;
231  }
232  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
233  is_array[row.size()]) {
234  in_array = true;
235  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
236  ++p;
237  if (*p == copy_params.array_end) {
238  in_array = false;
239  break;
240  }
241  }
242  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
243  if (!in_quote) {
244  if (!has_escape && !strip_quotes) {
245  const char* field_end = p;
246  if (copy_params.trim_spaces) {
247  trim_space(field, field_end);
248  }
249  row.emplace_back(field, field_end - field);
250  } else {
251  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
252  auto field_buf = tmp_buffers.back().get();
253  int j = 0, i = 0;
254  for (; i < p - field; i++, j++) {
255  if (has_escape && field[i] == copy_params.escape &&
256  field[i + 1] == copy_params.quote) {
257  field_buf[j] = copy_params.quote;
258  i++;
259  } else {
260  field_buf[j] = field[i];
261  }
262  }
263  const char* field_begin = field_buf;
264  const char* field_end = field_buf + j;
265  trim_quotes(field_begin, field_end, copy_params);
266  if (copy_params.trim_spaces) {
267  trim_space(field_begin, field_end);
268  }
269  row.emplace_back(field_begin, field_end - field_begin);
270  }
271  field = p + 1;
272  has_escape = false;
273  strip_quotes = false;
274 
275  if (is_eol(*p, copy_params)) {
276  // We are at the end of the row. Skip the line endings now.
277  if (filter_empty_lines) {
278  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
279  p++;
280  }
281  } else {
282  // skip DOS carriage return line feed only
283  if (p + 1 < buf_end && *p == '\r' && *(p + 1) == '\n') {
284  p++;
285  }
286  }
287  break;
288  }
289  }
290  }
291  }
292  /*
293  @TODO(wei) do error handling
294  */
295  if (in_quote) {
296  LOG(ERROR) << "Unmatched quote.";
297  try_single_thread = true;
298  }
299  if (in_array) {
300  LOG(ERROR) << "Unmatched array.";
301  try_single_thread = true;
302  }
303  return p;
304 }
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:285
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
void trim_space(const char *&field_begin, const char *&field_end)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string_view > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
void import_export::delimited_parser::parse_string_array ( const std::string &  s,
const import_export::CopyParams copy_params,
std::vector< std::string > &  string_vec,
bool  truncate_values = false 
)

Parses given string array and inserts into given vector of strings.

Parameters
sGiven string array
copy_paramsCopy params for the table.
string_vecGiven vector to be populated with parsed fields.

Definition at line 326 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_delim, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, get_row(), StringDictionary::MAX_STRLEN, import_export::CopyParams::null_str, and to_string().

Referenced by import_export::TypedImportBuffer::add_value(), import_export::TypedImportBuffer::addDefaultValues(), RowToColumnLoader::convert_string_to_column(), foreign_storage::anonymous_namespace{LogFileBufferParser.cpp}::create_map_from_arrays(), and data_conversion::StringViewToArrayEncoder< ScalarEncoderType >::encodeScalarData().

329  {
330  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
331  return;
332  }
333  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
334  throw std::runtime_error("Malformed Array :" + s);
335  }
336 
337  std::string row(s.c_str() + 1, s.length() - 2);
338  if (row.empty()) { // allow empty arrays
339  return;
340  }
341  row.push_back('\n');
342 
343  bool try_single_thread = false;
344  import_export::CopyParams array_params = copy_params;
345  array_params.delimiter = copy_params.array_delim;
346  std::vector<std::unique_ptr<char[]>> tmp_buffers;
347  get_row(row.c_str(),
348  row.c_str() + row.length(),
349  row.c_str() + row.length(),
350  array_params,
351  nullptr,
352  string_vec,
353  tmp_buffers,
354  try_single_thread,
355  true);
356 
357  for (size_t i = 0; i < string_vec.size(); ++i) {
358  if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
359  if (truncate_values) {
360  string_vec[i] = string_vec[i].substr(0, StringDictionary::MAX_STRLEN);
361  } else {
362  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
364  }
365  }
366  }
367 
368  // use empty string to mark nulls
369  for (auto& value : string_vec) {
370  if (value == copy_params.null_str || value == "NULL" || value.empty()) {
371  value.clear();
372  }
373  }
374 }
std::string to_string(char const *&&v)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::delimited_parser::set_max_buffer_resize ( const size_t  max_buffer_resize_param)

Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing.

Definition at line 162 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

162  {
163  max_buffer_resize = max_buffer_resize_param;
164 }

Variable Documentation