OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::delimited_parser Namespace Reference

Classes

class  InsufficientBufferSizeException
 

Functions

size_t find_beginning (const char *buffer, size_t begin, size_t end, const CopyParams &copy_params)
 Finds the closest possible row beginning in the given buffer. More...
 
size_t find_end (const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
 
size_t get_max_buffer_resize ()
 Gets the maximum size to which thread buffers should be automatically resized. More...
 
void set_max_buffer_resize (const size_t max_buffer_resize)
 Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing. More...
 
size_t find_row_end_pos (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader=nullptr)
 Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached. More...
 
template<typename T >
const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 Parses the first row in the given buffer and inserts fields into given vector. More...
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string_view > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
 
void parse_string_array (const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
 Parses given string array and inserts into given vector of strings. More...
 

Variables

static size_t max_buffer_resize = 1024 * 1024 * 1024
 

Function Documentation

size_t import_export::delimited_parser::find_beginning ( const char *  buffer,
size_t  begin,
size_t  end,
const CopyParams &  copy_params 
)

Finds the closest possible row beginning in the given buffer.

Parameters
bufferGiven buffer which has the rows in csv format. (NOT OWN)
beginStart index of buffer to look for the beginning.
endEnd index of buffer to look for the beginning.
copy_paramsCopy params for the table.
Returns
The position of the closest possible row beginning to the start of the given buffer.

Definition at line 61 of file DelimitedParserUtils.cpp.

References i, and import_export::CopyParams::line_delim.

Referenced by import_export::import_thread_delimited(), and foreign_storage::csv_file_buffer_parser::parse_buffer().

64  {
65  // @TODO(wei) line_delim is in quotes note supported
66  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
67  return 0;
68  }
69  size_t i;
70  const char* buf = buffer + begin;
71  for (i = 0; i < end - begin; i++) {
72  if (buf[i] == copy_params.line_delim) {
73  return i + 1;
74  }
75  }
76  return i;
77 }

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_end ( const char *  buffer,
size_t  size,
const import_export::CopyParams copy_params,
unsigned int &  num_rows_this_buffer,
size_t  buffer_first_row_index,
bool &  in_quote,
size_t  offset 
)

Definition at line 79 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::escape, import_export::CopyParams::line_delim, import_export::CopyParams::quote, import_export::CopyParams::quoted, and to_string().

Referenced by find_row_end_pos().

85  {
86  size_t last_line_delim_pos = 0;
87  const char* current = buffer + offset;
88  if (copy_params.quoted) {
89  while (current < buffer + size) {
90  while (!in_quote && current < buffer + size) {
91  // We are outside of quotes. We have to find the last possible line delimiter.
92  if (*current == copy_params.line_delim) {
93  last_line_delim_pos = current - buffer;
94  ++num_rows_this_buffer;
95  } else if (*current == copy_params.quote) {
96  in_quote = true;
97  }
98  ++current;
99  }
100 
101  while (in_quote && current < buffer + size) {
102  // We are in a quoted field. We have to find the ending quote.
103  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
104  (*(current + 1) == copy_params.quote)) {
105  ++current;
106  } else if (*current == copy_params.quote) {
107  in_quote = false;
108  }
109  ++current;
110  }
111  }
112  } else {
113  while (current < buffer + size) {
114  if (*current == copy_params.line_delim) {
115  last_line_delim_pos = current - buffer;
116  ++num_rows_this_buffer;
117  }
118  ++current;
119  }
120  }
121 
122  if (last_line_delim_pos <= 0) {
123  size_t excerpt_length = std::min<size_t>(50, size);
124  std::string buffer_excerpt{buffer, buffer + excerpt_length};
125  std::string error_message =
126  "Unable to find an end of line character after reading " + std::to_string(size) +
127  " characters. Please ensure that the correct \"line_delimiter\" option is "
128  "specified or update the \"buffer_size\" option appropriately. Row number: " +
129  std::to_string(buffer_first_row_index + 1) +
130  ". First few characters in row: " + buffer_excerpt;
131  throw InsufficientBufferSizeException{error_message};
132  }
133 
134  return last_line_delim_pos + 1;
135 }
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::find_row_end_pos ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const CopyParams &  copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FILE *  file,
foreign_storage::CsvReader csv_reader = nullptr 
)

Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached.

Parameters
alloc_sizeAllocation size of subsequent buffer. This is adjusted as needed, if the buffer has to be resized.
bufferGiven buffer which has the rows in csv format.
buffer_sizeSize of the buffer.
copy_paramsCopy params for the table.
buffer_first_row_indexIndex of first row in the buffer.
num_rows_in_bufferNumber of rows until the closest possible row ending.
fileHandle to CSV file being parsed. (optional)
csv_readerHandle to a CsvReader class, must be valid if file isnt
Returns
The position of the closest possible row ending to the end of the given buffer.

Definition at line 147 of file DelimitedParserUtils.cpp.

References CHECK, find_end(), get_max_buffer_resize(), logger::INFO, foreign_storage::CsvReader::isScanFinished(), LOG, max_buffer_resize, and foreign_storage::CsvReader::read().

Referenced by foreign_storage::dispatch_metadata_scan_requests(), and import_export::Importer::importDelimited().

154  {
155  bool found_end_pos{false};
156  bool in_quote{false};
157  size_t offset{0};
158  size_t end_pos;
159  CHECK(file != nullptr || csv_reader != nullptr);
161  while (!found_end_pos) {
162  try {
163  end_pos = delimited_parser::find_end(buffer.get(),
164  buffer_size,
165  copy_params,
166  num_rows_in_buffer,
167  buffer_first_row_index,
168  in_quote,
169  offset);
170  found_end_pos = true;
171  } catch (InsufficientBufferSizeException& e) {
172  if (alloc_size >= max_buffer_resize) {
173  throw;
174  }
175  if (file == nullptr && csv_reader->isScanFinished()) {
176  throw;
177  }
178  auto old_buffer = std::move(buffer);
179  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
180  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
181  << " bytes";
182  buffer = std::make_unique<char[]>(alloc_size);
183 
184  memcpy(buffer.get(), old_buffer.get(), buffer_size);
185  size_t fread_size;
186  if (file != nullptr) {
187  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
188  } else {
189  fread_size =
190  csv_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
191  }
192  offset = buffer_size;
193  buffer_size += fread_size;
194  }
195  }
196  return end_pos;
197 }
#define LOG(tag)
Definition: Logger.h:194
virtual size_t read(void *buffer, size_t max_size)=0
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
virtual bool isScanFinished()=0
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t import_export::delimited_parser::get_max_buffer_resize ( )

Gets the maximum size to which thread buffers should be automatically resized.

Definition at line 139 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

Referenced by find_row_end_pos().

139  {
140  return max_buffer_resize;
141 }

+ Here is the caller graph for this function:

template<typename T >
const char * import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< T > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)

Parses the first row in the given buffer and inserts fields into given vector.

Parameters
bufGiven buffer which has the rows in csv format. (NOT OWN)
buf_endEnd of the sliced buffer for the thread. (NOT OWN)
entire_buf_endEnd of the entire buffer. (NOT OWN)
copy_paramsCopy params for the table.
is_arrayArray of bools which tells if a column is an array type.
rowGiven vector to be populated with parsed fields.
try_single_threadIn case of parse errors, this will tell if parsing should continue with single thread.
filter_empty_linesWhether to skip empty lines (used when parsing single columns returned by s3 select, as nulls may be encoded as empty lines)
Returns
Pointer to the next row after the first row is parsed.

Definition at line 200 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, logger::ERROR, import_export::CopyParams::escape, field(), i, anonymous_namespace{DelimitedParserUtils.cpp}::is_eol(), generate_TableFunctionsFactory_init::j, LOG, import_export::CopyParams::quote, import_export::CopyParams::quoted, anonymous_namespace{DelimitedParserUtils.cpp}::trim_quotes(), and import_export::trim_space().

Referenced by import_export::import_thread_delimited(), foreign_storage::csv_file_buffer_parser::parse_and_validate_expected_column_count(), foreign_storage::csv_file_buffer_parser::parse_buffer(), parse_string_array(), and import_export::Detector::split_raw_data().

208  {
209  const char* field = buf;
210  const char* p;
211  bool in_quote = false;
212  bool in_array = false;
213  bool has_escape = false;
214  bool strip_quotes = false;
215  try_single_thread = false;
216  for (p = buf; p < entire_buf_end; ++p) {
217  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
218  *(p + 1) == copy_params.quote) {
219  p++;
220  has_escape = true;
221  } else if (copy_params.quoted && *p == copy_params.quote) {
222  in_quote = !in_quote;
223  if (in_quote) {
224  strip_quotes = true;
225  }
226  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
227  is_array[row.size()]) {
228  in_array = true;
229  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
230  ++p;
231  if (*p == copy_params.array_end) {
232  in_array = false;
233  break;
234  }
235  }
236  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
237  if (!in_quote) {
238  if (!has_escape && !strip_quotes) {
239  const char* field_end = p;
240  trim_space(field, field_end);
241  row.emplace_back(field, field_end - field);
242  } else {
243  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
244  auto field_buf = tmp_buffers.back().get();
245  int j = 0, i = 0;
246  for (; i < p - field; i++, j++) {
247  if (has_escape && field[i] == copy_params.escape &&
248  field[i + 1] == copy_params.quote) {
249  field_buf[j] = copy_params.quote;
250  i++;
251  } else {
252  field_buf[j] = field[i];
253  }
254  }
255  const char* field_begin = field_buf;
256  const char* field_end = field_buf + j;
257  trim_space(field_begin, field_end);
258  trim_quotes(field_begin, field_end, copy_params);
259  row.emplace_back(field_begin, field_end - field_begin);
260  }
261  field = p + 1;
262  has_escape = false;
263  strip_quotes = false;
264 
265  if (is_eol(*p, copy_params)) {
266  // We are at the end of the row. Skip the line endings now.
267  if (filter_empty_lines) {
268  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
269  p++;
270  }
271  }
272  break;
273  }
274  }
275  }
276  }
277  /*
278  @TODO(wei) do error handling
279  */
280  if (in_quote) {
281  LOG(ERROR) << "Unmatched quote.";
282  try_single_thread = true;
283  }
284  if (in_array) {
285  LOG(ERROR) << "Unmatched array.";
286  try_single_thread = true;
287  }
288  return p;
289 }
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:194
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
void trim_space(const char *&field_begin, const char *&field_end)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string_view > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread,
bool  filter_empty_lines 
)
void import_export::delimited_parser::parse_string_array ( const std::string &  s,
const import_export::CopyParams copy_params,
std::vector< std::string > &  string_vec 
)

Parses given string array and inserts into given vector of strings.

Parameters
sGiven string array
copy_paramsCopy params for the table.
string_vecGiven vector to be populated with parsed fields.

Definition at line 311 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_delim, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, get_row(), i, StringDictionary::MAX_STRLEN, import_export::CopyParams::null_str, and to_string().

Referenced by import_export::TypedImportBuffer::add_value(), and RowToColumnLoader::convert_string_to_column().

313  {
314  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
315  // TODO: should not convert NULL, empty arrays to {"NULL"},
316  // need to support NULL, empty properly
317  string_vec.emplace_back("NULL");
318  return;
319  }
320  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
321  throw std::runtime_error("Malformed Array :" + s);
322  }
323 
324  std::string row(s.c_str() + 1, s.length() - 2);
325  row.push_back('\n');
326  bool try_single_thread = false;
327  import_export::CopyParams array_params = copy_params;
328  array_params.delimiter = copy_params.array_delim;
329  std::vector<std::unique_ptr<char[]>> tmp_buffers;
330  get_row(row.c_str(),
331  row.c_str() + row.length(),
332  row.c_str() + row.length(),
333  array_params,
334  nullptr,
335  string_vec,
336  tmp_buffers,
337  try_single_thread,
338  true);
339 
340  for (size_t i = 0; i < string_vec.size(); ++i) {
341  if (string_vec[i].empty()) { // Disallow empty strings for now
342  string_vec.erase(string_vec.begin() + i);
343  --i;
344  } else if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
345  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
347  }
348  }
349 }
std::string to_string(char const *&&v)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::delimited_parser::set_max_buffer_resize ( const size_t  max_buffer_resize_param)

Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing.

Definition at line 143 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

143  {
144  max_buffer_resize = max_buffer_resize_param;
145 }

Variable Documentation

size_t import_export::delimited_parser::max_buffer_resize = 1024 * 1024 * 1024
static