OmniSciDB  2e3a973ef4
import_export::delimited_parser Namespace Reference

Classes

class  InsufficientBufferSizeException
 

Functions

size_t find_beginning (const char *buffer, size_t begin, size_t end, const CopyParams &copy_params)
 Finds the closest possible row beginning in the given buffer. More...
 
size_t find_end (const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
 
size_t get_max_buffer_resize ()
 Gets the maximum size to which thread buffers should be automatically resized. More...
 
void set_max_buffer_resize (const size_t max_buffer_resize)
 Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing. More...
 
size_t find_row_end_pos (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader=nullptr)
 Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached. More...
 
template<typename T >
const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
 Parses the first row in the given buffer and inserts fields into given vector. More...
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
 
template const char * get_row (const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string_view > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
 
void parse_string_array (const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
 Parses given string array and inserts into given vector of strings. More...
 

Variables

static size_t max_buffer_resize = 1024 * 1024 * 1024
 

Function Documentation

◆ find_beginning()

size_t import_export::delimited_parser::find_beginning ( const char *  buffer,
size_t  begin,
size_t  end,
const CopyParams copy_params 
)

Finds the closest possible row beginning in the given buffer.

Parameters
bufferGiven buffer which has the rows in csv format. (NOT OWN)
beginStart index of buffer to look for the beginning.
endEnd index of buffer to look for the beginning.
copy_paramsCopy params for the table.
Returns
The position of the closest possible row beginning to the start of the given buffer.

Definition at line 61 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::line_delim.

Referenced by import_export::import_thread_delimited(), import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException(), and foreign_storage::csv_file_buffer_parser::parse_buffer().

64  {
65  // @TODO(wei) line_delim is in quotes note supported
66  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
67  return 0;
68  }
69  size_t i;
70  const char* buf = buffer + begin;
71  for (i = 0; i < end - begin; i++) {
72  if (buf[i] == copy_params.line_delim) {
73  return i + 1;
74  }
75  }
76  return i;
77 }
+ Here is the caller graph for this function:

◆ find_end()

size_t import_export::delimited_parser::find_end ( const char *  buffer,
size_t  size,
const import_export::CopyParams copy_params,
unsigned int &  num_rows_this_buffer,
size_t  buffer_first_row_index,
bool &  in_quote,
size_t  offset 
)

Definition at line 79 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::escape, import_export::CopyParams::line_delim, import_export::CopyParams::quote, import_export::CopyParams::quoted, and to_string().

Referenced by find_row_end_pos().

85  {
86  size_t last_line_delim_pos = 0;
87  const char* current = buffer + offset;
88  if (copy_params.quoted) {
89  while (current < buffer + size) {
90  while (!in_quote && current < buffer + size) {
91  // We are outside of quotes. We have to find the last possible line delimiter.
92  if (*current == copy_params.line_delim) {
93  last_line_delim_pos = current - buffer;
94  ++num_rows_this_buffer;
95  } else if (*current == copy_params.quote) {
96  in_quote = true;
97  }
98  ++current;
99  }
100 
101  while (in_quote && current < buffer + size) {
102  // We are in a quoted field. We have to find the ending quote.
103  if ((*current == copy_params.escape) && (current < buffer + size - 1) &&
104  (*(current + 1) == copy_params.quote)) {
105  ++current;
106  } else if (*current == copy_params.quote) {
107  in_quote = false;
108  }
109  ++current;
110  }
111  }
112  } else {
113  while (current < buffer + size) {
114  if (*current == copy_params.line_delim) {
115  last_line_delim_pos = current - buffer;
116  ++num_rows_this_buffer;
117  }
118  ++current;
119  }
120  }
121 
122  if (last_line_delim_pos <= 0) {
123  size_t excerpt_length = std::min<size_t>(50, size);
124  std::string buffer_excerpt{buffer, buffer + excerpt_length};
125  std::string error_message =
126  "Unable to find an end of line character after reading " + std::to_string(size) +
127  " characters. Please ensure that the correct \"line_delimiter\" option is "
128  "specified or update the \"buffer_size\" option appropriately. Row number: " +
129  std::to_string(buffer_first_row_index + 1) +
130  ". First few characters in row: " + buffer_excerpt;
131  throw InsufficientBufferSizeException{error_message};
132  }
133 
134  return last_line_delim_pos + 1;
135 }
std::string to_string(char const *&&v)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ find_row_end_pos()

size_t import_export::delimited_parser::find_row_end_pos ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FILE *  file,
foreign_storage::CsvReader csv_reader = nullptr 
)

Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed, with more content read from the file, until an end of row is found or a configured max buffer limit is reached.

Parameters
alloc_sizeAllocation size of subsequent buffer. This is adjusted as needed, if the buffer has to be resized.
bufferGiven buffer which has the rows in csv format.
buffer_sizeSize of the buffer.
copy_paramsCopy params for the table.
buffer_first_row_indexIndex of first row in the buffer.
num_rows_in_bufferNumber of rows until the closest possible row ending.
fileHandle to CSV file being parsed. (optional)
csv_readerHandle to a CsvReader class, must be valid if file isnt
Returns
The position of the closest possible row ending to the end of the given buffer.

Definition at line 147 of file DelimitedParserUtils.cpp.

References CHECK, find_end(), get_max_buffer_resize(), logger::INFO, foreign_storage::CsvReader::isScanFinished(), LOG, and foreign_storage::CsvReader::read().

Referenced by foreign_storage::dispatch_metadata_scan_requests(), import_export::Importer::importDelimited(), and import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException().

154  {
155  bool found_end_pos{false};
156  bool in_quote{false};
157  size_t offset{0};
158  size_t end_pos;
159  CHECK(file != nullptr || csv_reader != nullptr);
161  while (!found_end_pos) {
162  try {
163  end_pos = delimited_parser::find_end(buffer.get(),
164  buffer_size,
165  copy_params,
166  num_rows_in_buffer,
167  buffer_first_row_index,
168  in_quote,
169  offset);
170  found_end_pos = true;
171  } catch (InsufficientBufferSizeException& e) {
172  if (alloc_size >= max_buffer_resize) {
173  throw;
174  }
175  if (file == nullptr && csv_reader->isScanFinished()) {
176  throw;
177  }
178  auto old_buffer = std::move(buffer);
179  alloc_size = std::min(max_buffer_resize, alloc_size * 2);
180  LOG(INFO) << "Setting import thread buffer allocation size to " << alloc_size
181  << " bytes";
182  buffer = std::make_unique<char[]>(alloc_size);
183 
184  memcpy(buffer.get(), old_buffer.get(), buffer_size);
185  size_t fread_size;
186  if (file != nullptr) {
187  fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
188  } else {
189  fread_size =
190  csv_reader->read(buffer.get() + buffer_size, alloc_size - buffer_size);
191  }
192  offset = buffer_size;
193  buffer_size += fread_size;
194  }
195  }
196  return end_pos;
197 }
#define LOG(tag)
Definition: Logger.h:188
virtual size_t read(void *buffer, size_t max_size)=0
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
virtual bool isScanFinished()=0
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_max_buffer_resize()

size_t import_export::delimited_parser::get_max_buffer_resize ( )

Gets the maximum size to which thread buffers should be automatically resized.

Definition at line 139 of file DelimitedParserUtils.cpp.

References max_buffer_resize.

Referenced by find_row_end_pos(), and import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException().

139  {
140  return max_buffer_resize;
141 }
+ Here is the caller graph for this function:

◆ get_row() [1/3]

template<typename T >
const char * import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< T > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread 
)

Parses the first row in the given buffer and inserts fields into given vector.

Parameters
bufGiven buffer which has the rows in csv format. (NOT OWN)
buf_endEnd of the sliced buffer for the thread. (NOT OWN)
entire_buf_endEnd of the entire buffer. (NOT OWN)
copy_paramsCopy params for the table.
is_arrayArray of bools which tells if a column is an array type.
rowGiven vector to be populated with parsed fields.
try_single_threadIn case of parse errors, this will tell if parsing should continue with single thread.
Returns
Pointer to the next row after the first row is parsed.

Definition at line 200 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, logger::ERROR, import_export::CopyParams::escape, field(), anonymous_namespace{DelimitedParserUtils.cpp}::is_eol(), LOG, import_export::CopyParams::quote, import_export::CopyParams::quoted, anonymous_namespace{DelimitedParserUtils.cpp}::trim_quotes(), and import_export::trim_space().

Referenced by import_export::import_thread_delimited(), import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException(), foreign_storage::csv_file_buffer_parser::parse_buffer(), parse_string_array(), and import_export::Detector::split_raw_data().

207  {
208  const char* field = buf;
209  const char* p;
210  bool in_quote = false;
211  bool in_array = false;
212  bool has_escape = false;
213  bool strip_quotes = false;
214  try_single_thread = false;
215  for (p = buf; p < entire_buf_end; ++p) {
216  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
217  *(p + 1) == copy_params.quote) {
218  p++;
219  has_escape = true;
220  } else if (copy_params.quoted && *p == copy_params.quote) {
221  in_quote = !in_quote;
222  if (in_quote) {
223  strip_quotes = true;
224  }
225  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
226  is_array[row.size()]) {
227  in_array = true;
228  while (p < entire_buf_end - 1) { // Array type will be parsed separately.
229  ++p;
230  if (*p == copy_params.array_end) {
231  in_array = false;
232  break;
233  }
234  }
235  } else if (*p == copy_params.delimiter || is_eol(*p, copy_params)) {
236  if (!in_quote) {
237  if (!has_escape && !strip_quotes) {
238  const char* field_end = p;
239  trim_space(field, field_end);
240  row.emplace_back(field, field_end - field);
241  } else {
242  tmp_buffers.emplace_back(std::make_unique<char[]>(p - field + 1));
243  auto field_buf = tmp_buffers.back().get();
244  int j = 0, i = 0;
245  for (; i < p - field; i++, j++) {
246  if (has_escape && field[i] == copy_params.escape &&
247  field[i + 1] == copy_params.quote) {
248  field_buf[j] = copy_params.quote;
249  i++;
250  } else {
251  field_buf[j] = field[i];
252  }
253  }
254  const char* field_begin = field_buf;
255  const char* field_end = field_buf + j;
256  trim_space(field_begin, field_end);
257  trim_quotes(field_begin, field_end, copy_params);
258  row.emplace_back(field_begin, field_end - field_begin);
259  }
260  field = p + 1;
261  has_escape = false;
262  strip_quotes = false;
263 
264  if (is_eol(*p, copy_params)) {
265  // We are at the end of the row. Skip the line endings now.
266  while (p + 1 < buf_end && is_eol(*(p + 1), copy_params)) {
267  p++;
268  }
269  break;
270  }
271  }
272  }
273  }
274  /*
275  @TODO(wei) do error handling
276  */
277  if (in_quote) {
278  LOG(ERROR) << "Unmatched quote.";
279  try_single_thread = true;
280  }
281  if (in_array) {
282  LOG(ERROR) << "Unmatched array.";
283  try_single_thread = true;
284  }
285  return p;
286 }
bool is_eol(const char &c, const import_export::CopyParams &copy_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:188
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
void trim_space(const char *&field_begin, const char *&field_end)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_row() [2/3]

template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread 
)

◆ get_row() [3/3]

template const char* import_export::delimited_parser::get_row ( const char *  buf,
const char *  buf_end,
const char *  entire_buf_end,
const import_export::CopyParams copy_params,
const bool *  is_array,
std::vector< std::string_view > &  row,
std::vector< std::unique_ptr< char[]>> &  tmp_buffers,
bool &  try_single_thread 
)

◆ parse_string_array()

void import_export::delimited_parser::parse_string_array ( const std::string &  s,
const import_export::CopyParams copy_params,
std::vector< std::string > &  string_vec 
)

Parses given string array and inserts into given vector of strings.

Parameters
sGiven string array
copy_paramsCopy params for the table.
string_vecGiven vector to be populated with parsed fields.

Definition at line 306 of file DelimitedParserUtils.cpp.

References import_export::CopyParams::array_begin, import_export::CopyParams::array_delim, import_export::CopyParams::array_end, import_export::CopyParams::delimiter, get_row(), StringDictionary::MAX_STRLEN, import_export::CopyParams::null_str, and to_string().

Referenced by import_export::TypedImportBuffer::add_value(), RowToColumnLoader::convert_string_to_column(), and import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException().

308  {
309  if (s == copy_params.null_str || s == "NULL" || s.size() < 1 || s.empty()) {
310  // TODO: should not convert NULL, empty arrays to {"NULL"},
311  // need to support NULL, empty properly
312  string_vec.emplace_back("NULL");
313  return;
314  }
315  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
316  throw std::runtime_error("Malformed Array :" + s);
317  }
318 
319  std::string row(s.c_str() + 1, s.length() - 2);
320  row.push_back('\n');
321  bool try_single_thread = false;
322  import_export::CopyParams array_params = copy_params;
323  array_params.delimiter = copy_params.array_delim;
324  std::vector<std::unique_ptr<char[]>> tmp_buffers;
325  get_row(row.c_str(),
326  row.c_str() + row.length(),
327  row.c_str() + row.length(),
328  array_params,
329  nullptr,
330  string_vec,
331  tmp_buffers,
332  try_single_thread);
333 
334  for (size_t i = 0; i < string_vec.size(); ++i) {
335  if (string_vec[i].empty()) { // Disallow empty strings for now
336  string_vec.erase(string_vec.begin() + i);
337  --i;
338  } else if (string_vec[i].size() > StringDictionary::MAX_STRLEN) {
339  throw std::runtime_error("Array String too long : " + string_vec[i] + " max is " +
341  }
342  }
343 }
std::string to_string(char const *&&v)
template const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< std::string_view > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
static constexpr size_t MAX_STRLEN
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_max_buffer_resize()

void import_export::delimited_parser::set_max_buffer_resize ( const size_t  max_buffer_resize_param)

Sets the maximum size to which thread buffers should be automatically resized. This function is only used for testing.

Definition at line 143 of file DelimitedParserUtils.cpp.

Referenced by import_export::delimited_parser::InsufficientBufferSizeException::InsufficientBufferSizeException().

143  {
144  max_buffer_resize = max_buffer_resize_param;
145 }
+ Here is the caller graph for this function:

Variable Documentation

◆ max_buffer_resize

size_t import_export::delimited_parser::max_buffer_resize = 1024 * 1024 * 1024
static