OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::delimited_parser Namespace Reference

Classes

class  InsufficientBufferSizeException
 

Functions

size_t find_beginning (const char *buffer, size_t begin, size_t end, const CopyParams &copy_params)
 Finds the closest possible row beginning in the given buffer. More...
 
size_t find_end (const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
 
size_t get_max_buffer_resize ()
 Gets the maximum size to which thread buffers should be automatically resized. More...
 
void set_max_buffer_resize (const size_t max_buffer_resize)
 Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing. More...
 
size_t find_row_end_pos (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader=nullptr)
 Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached. More...
 
template<typename T >
const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 Parses the first row in the given buffer and inserts fields into given vector. More...
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string_view > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
void parse_string_array (const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values=false)
 Parses given string array and inserts into given vector of strings. More...
 
void extend_buffer (std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
 

Variables

static size_t max_buffer_resize = max_import_buffer_resize_byte_size
 

Function Documentation

void import_export::delimited_parser::extend_buffer ( std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
size_t &  alloc_size,
FILE *  file,
foreign_storage::FileReader file_reader,
size_t  max_buffer_resize 
)

Extends the given buffer to the lesser of max_buffer_resize or twice the given allocation size and reads new content from the file into the newly allocated buffer.

Parameters
buffer- buffer that will be extended
buffer_size- current buffer size
alloc_size- current allocation size
file- handle for file to be read from (one of file or file_reader must be present)
file_reader- reader for file to be read from (one of file or file_reader must be present)
max_buffer_resize- maximum size that the buffer can be extended to

Definition at line 358 of file DelimitedParserUtils.cpp.

References CHECK, logger::INFO, LOG, and foreign_storage::FileReader::read().

Referenced by find_row_end_pos(), and foreign_storage::RegexFileBufferParser::findRowEndPosition().

363  {
364  auto old_buffer = std::move(buffer);
365  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
366  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
367  << " bytes";
368  buffer = std::make_unique<char[]>(alloc_size);
369 
370  memcpy(buffer.get(), old_buffer.get(), buffer_size);
371  size_t fread_size;
372  CHECK(file != nullptr || file_reader != nullptr);
373  if (file != nullptr) {
374  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
375  } else {
376  fread_size = file_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
377  }
378  buffer_size += fread_size;
379 }
virtual size_t read(void *buffer, size_t max_size)=0
#define LOG(tag)
Definition: Logger.h:283
#define CHECK(condition)
Definition: Logger.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_beginning ( const char *  buffer,
size_t  begin,
size_t  end,
const CopyParams &  copy_params 
)

Finds the closest possible row beginning in the given buffer.

Parameters
bufferGiven buffer which has the rows in csv format. (NOT OWN)
beginStart index of buffer to look for the beginning.
endEnd index of buffer to look for the beginning.
copy_paramsCopy params for the table.
Returns
The position of the closest possible row beginning to the start of the given buffer.

Definition at line 62 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::line_delim.

Referenced by import_export::import_thread_delimited(), and foreign_storage::CsvFileBufferParser::parseBuffer().

65  {
66  // @TODO(wei) line_delim is in quotes note supported
67  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
68  return 0;
69  }
70  size_t i;
71  const char* buf = buffer + begin;
72  for (i = 0; i < end - begin; i++) {
73  if (buf[i] == copy_params.line_delim) {
74  return i + 1;
75  }
76  }
77  return i;
78 }

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_end ( const char *  buffer,
size_t  size,
const import_export::CopyParams copy_params,
unsigned int &  num_rows_this_buffer,
size_t  buffer_first_row_index,
bool &  in_quote,
size_t  offset 
)

Definition at line 80 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::escape, import_export::CopyParams::line_delim, import_export::CopyParams::quote, import_export::CopyParams::quoted, and to_string().

Referenced by find_row_end_pos().

86  {
87  size_t last_line_delim_pos = 0;
88  const char* current = buffer + offset;
89  if (copy_params.quoted) {
90  while (current < buffer + size) {
91  while (!in_quote && current < buffer + size) {
92  // We are outside of quotes. We have to find the last possible line delimiter.
93  if (*current == copy_params.line_delim) {
94  last_line_delim_pos = current - buffer;
95  ++num_rows_this_buffer;
96  } else if (*current == copy_params.quote) {
97  in_quote = true;
98  }
99  ++current;
100  }
101 
102  while (in_quote && current < buffer + size) {
103  // We are in a quoted field. We have to find the ending quote.
104  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
105  (*(current + 1) == copy_params.quote)) {
106  ++current;
107  } else if (*current == copy_params.quote) {
108  in_quote = false;
109  }
110  ++current;
111  }
112  }
113  } else {
114  while (current < buffer + size) {
115  if (*current == copy_params.line_delim) {
116  last_line_delim_pos = current - buffer;
117  ++num_rows_this_buffer;
118  }
119  ++current;
120  }
121  }
122 
123  if (last_line_delim_pos <= 0) {
124  size_t excerpt_length = std::min<size_t>(50, size);
125  std::string buffer_excerpt{buffer, buffer + excerpt_length};
126  std::string error_message =
127  "Unable to find an end of line character after reading " + std::to_string(size) +
128  " characters. Please ensure that the correct \"line_delimiter\" option is "
129  "specified or update the \"buffer_size\" option appropriately. Row number: " +
130  std::to_string(buffer_first_row_index + 1) +
131  ". First few characters in row: " + buffer_excerpt;
132  throw InsufficientBufferSizeException{error_message};
133  }
134 
135  return last_line_delim_pos + 1;
136 }
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_row_end_pos ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const CopyParams &  copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FILE *  file,
foreign_storage::FileReader file_reader = nullptr 
)

Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached.

Parameters
alloc_sizeAllocation size of subsequent buffer. This is adjusted as needed, if the buffer has to be resized.
bufferGiven buffer which has the rows in csv format.
buffer_sizeSize of the buffer.
copy_paramsCopy params for the table.
buffer_first_row_indexIndex of first row in the buffer.
num_rows_in_bufferNumber of rows until the closest possible row ending.
fileHandle to CSV file being parsed. (optional)
file_readerHandle to a FileReader class, must be valid if file isnt
Returns
The position of the closest possible row ending to the end of the given buffer.

Definition at line 148 of file DelimitedParserUtils.cpp.

References CHECK, extend_buffer(), find_end(), get_max_buffer_resize(), foreign_storage::FileReader::isScanFinished(), and max_buffer_resize.

Referenced by foreign_storage::CsvFileBufferParser::findRowEndPosition(), and import_export::Importer::importDelimited().

155  {
156  bool found_end_pos{false};
157  bool in_quote{false};
158  size_t offset{0};
159  size_t end_pos;
160  CHECK(file != nullptr || file_reader != nullptr);
162  while (!found_end_pos) {
163  try {
164  end_pos = delimited_parser::find_end(buffer.get(),
165  buffer_size,
166  copy_params,
167  num_rows_in_buffer,
168  buffer_first_row_index,
169  in_quote,
170  offset);
171  found_end_pos = true;
172  } catch (InsufficientBufferSizeException& e) {
173  if (alloc_size >= max_buffer_resize) {
174  throw;
175  }
176  if (file == nullptr && file_reader->isScanFinished()) {
177  throw;
178  }
179  offset = buffer_size;
181  buffer, buffer_size, alloc_size, file, file_reader, max_buffer_resize);
182  }
183  }
184  return end_pos;
185 }
virtual bool isScanFinished() const =0
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
#define CHECK(condition)
Definition: Logger.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::get_max_buffer_resize ( )

Gets the maximum size to which thread buffers should be automatically resized.

Definition at line 140 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

Referenced by find_row_end_pos().

140  {
141  return max_buffer_resize;
142 }

+ Here is the caller graph for this function:

template<typename T >
const char * import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< T > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)

Parses the first row in the given buffer and inserts fields into given vector.

Parameters
bufGiven buffer which has the rows in csv format. (NOT OWN)
buf_endEnd of the sliced buffer for the thread. (NOT OWN)
entire_buf_endEnd of the entire buffer. (NOT OWN)
copy_paramsCopy params for the table.
is_arrayArray of bools which tells if a column is an array type.
rowGiven vector to be populated with parsed fields.
try_single_threadIn case of parse errors, this will tell if parsing should continue with single thread.
filter_empty_linesWhether to skip empty lines (used when parsing single columns returned by s3 select, as nulls may be encoded as empty lines)
Returns
Pointer to the next row after the first row is parsed.

Definition at line 188 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, logger::ERROR, import_export::CopyParams::escape, field(), anonymous_namespace{DelimitedParserUtils.cpp}::is_eol(), LOG, import_export::CopyParams::quote, import_export::CopyParams::quoted, anonymous_namespace{DelimitedParserUtils.cpp}::trim_quotes(), import_export::trim_space(), and import_export::CopyParams::trim_spaces.

Referenced by import_export::import_thread_delimited(), parse_string_array(), foreign_storage::CsvFileBufferParser::parseBuffer(), import_export::Detector::split_raw_data(), and foreign_storage::CsvFileBufferParser::validateExpectedColumnCount().

196  {
197  const char* field = buf;
198  const char* p;
199  bool in_quote = false;
200  bool in_array = false;
201  bool has_escape = false;
202  bool strip_quotes = false;
203  try_single_thread = false;
204  for (p = buf; p < entire_buf_end; ++p) {
205  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
206  *(p + 1) == copy_params.quote) {
207  p++;
208  has_escape = true;
209  } else if (copy_params.quoted && *p == copy_params.quote) {
210  in_quote = !in_quote;
211  if (in_quote) {
212  strip_quotes = true;
213  }
214  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
215  is_array[row.size()]) {
216  in_array = true;
217  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
218  ++p;
219  if (*p == copy_params.array_end) {
220  in_array = false;
221  break;
222  }
223  }
224  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
225  if (!in_quote) {
226  if (!has_escape && !strip_quotes) {
227  const char* field_end = p;
228  if (copy_params.trim_spaces) {
229  trim_space(field, field_end);
230  }
231  row.emplace_back(field, field_end - field);
232  } else {
233  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
234  auto field_buf = tmp_buffers.back().get();
235  int j = 0, i = 0;
236  for (; i < p - field; i++, j++) {
237  if (has_escape && field[i] == copy_params.escape &&
238  field[i + 1] == copy_params.quote) {
239  field_buf[j] = copy_params.quote;
240  i++;
241  } else {
242  field_buf[j] = field[i];
243  }
244  }
245  const char* field_begin = field_buf;
246  const char* field_end = field_buf + j;
247  if (copy_params.trim_spaces) {
248  trim_space(field_begin, field_end);
249  }
250  trim_quotes(field_begin, field_end, copy_params);
251  row.emplace_back(field_begin, field_end - field_begin);
252  }
253  field = p + 1;
254  has_escape = false;
255  strip_quotes = false;
256 
257  if (is_eol(*p, copy_params)) {
258  // We are at the end of the row. Skip the line endings now.
259  if (filter_empty_lines) {
260  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
261  p++;
262  }
263  } else {
264  // skip DOS carriage return line feed only
265  if (p + 1 < buf_end && *p == '\r' && *(p + 1) == '\n') {
266  p++;
267  }
268  }
269  break;
270  }
271  }
272  }
273  }
274  /*
275  @TODO(wei) do error handling
276  */
277  if (in_quote) {
278  LOG(ERROR) << "Unmatched quote.";
279  try_single_thread = true;
280  }
281  if (in_array) {
282  LOG(ERROR) << "Unmatched array.";
283  try_single_thread = true;
284  }
285  return p;
286 }
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:283
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
void trim_space(const char *&field_begin, const char *&field_end)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string_view > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
void import_export::delimited_parser::parse_string_array ( const std::string &  s,
const import_export::CopyParams copy_params,
std::vector< std::string > &  string_vec,
bool  truncate_values = false 
)

Parses given string array and inserts into given vector of strings.

Parameters
sGiven string array
copy_paramsCopy params for the table.
string_vecGiven vector to be populated with parsed fields.

Definition at line 308 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_delim, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, get_row(), StringDictionary::MAX_STRLEN, import_export::CopyParams::null_str, and to_string().

Referenced by import_export::TypedImportBuffer::add_value(), import_export::TypedImportBuffer::addDefaultValues(), RowToColumnLoader::convert_string_to_column(), and foreign_storage::anonymous_namespace{LogFileBufferParser.cpp}::create_map_from_arrays().

311  {
312  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
313  return;
314  }
315  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
316  throw std::runtime_error("Malformed Array :" + s);
317  }
318 
319  std::string row(s.c_str() + 1, s.length() - 2);
320  if (row.empty()) { // allow empty arrays
321  return;
322  }
323  row.push_back('\n');
324 
325  bool try_single_thread = false;
326  import_export::CopyParams array_params = copy_params;
327  array_params.delimiter = copy_params.array_delim;
328  std::vector<std::unique_ptr<char[]>> tmp_buffers;
329  get_row(row.c_str(),
330  row.c_str() + row.length(),
331  row.c_str() + row.length(),
332  array_params,
333  nullptr,
334  string_vec,
335  tmp_buffers,
336  try_single_thread,
337  true);
338 
339  for (size_t i = 0; i < string_vec.size(); ++i) {
340  if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
341  if (truncate_values) {
342  string_vec[i] = string_vec[i].substr(0, StringDictionary::MAX_STRLEN);
343  } else {
344  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
346  }
347  }
348  }
349 
350  // use empty string to mark nulls
351  for (auto& value : string_vec) {
352  if (value == copy_params.null_str || value == "NULL" || value.empty()) {
353  value.clear();
354  }
355  }
356 }
std::string to_string(char const *&&v)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::delimited_parser::set_max_buffer_resize ( const size_t  max_buffer_resize_param)

Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing.

Definition at line 144 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

144  {
145  max_buffer_resize = max_buffer_resize_param;
146 }

Variable Documentation