25 #include <string_view>
33 return c == copy_params.
line_delim || c ==
'\n' || c ==
'\r';
36 inline void trim_space(
const char*& field_begin,
const char*& field_end) {
37 while (field_begin < field_end && (*field_begin ==
' ' || *field_begin ==
'\r')) {
40 while (field_begin < field_end &&
41 (*(field_end - 1) ==
' ' || *(field_end - 1) ==
'\r')) {
47 const char*& field_end,
49 if (copy_params.
quoted && field_end - field_begin > 0 &&
50 *field_begin == copy_params.
quote) {
53 if (copy_params.
quoted && field_end - field_begin > 0 &&
54 *(field_end - 1) == copy_params.
quote) {
60 namespace import_export {
61 namespace delimited_parser {
67 if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.
line_delim)) {
71 const char* buf = buffer + begin;
72 for (i = 0; i < end - begin; i++) {
83 unsigned int& num_rows_this_buffer,
84 size_t buffer_first_row_index,
87 size_t last_line_delim_pos = 0;
88 const char* current = buffer + offset;
90 while (current < buffer + size) {
91 while (!in_quote && current < buffer + size) {
94 last_line_delim_pos = current - buffer;
95 ++num_rows_this_buffer;
96 }
else if (*current == copy_params.
quote) {
102 while (in_quote && current < buffer + size) {
104 if ((*current == copy_params.
escape) && (current < buffer + size - 1) &&
105 (*(current + 1) == copy_params.
quote)) {
107 }
else if (*current == copy_params.
quote) {
114 while (current < buffer + size) {
116 last_line_delim_pos = current - buffer;
117 ++num_rows_this_buffer;
123 if (last_line_delim_pos <= 0) {
124 size_t excerpt_length = std::min<size_t>(50, size);
125 std::string buffer_excerpt{buffer, buffer + excerpt_length};
126 std::string error_message =
127 "Unable to find an end of line character after reading " +
std::to_string(size) +
128 " characters. Please ensure that the correct \"line_delimiter\" option is "
129 "specified or update the \"buffer_size\" option appropriately. Row number: " +
131 ". First few characters in row: " + buffer_excerpt;
135 return last_line_delim_pos + 1;
149 std::unique_ptr<
char[]>& buffer,
152 const size_t buffer_first_row_index,
153 unsigned int& num_rows_in_buffer,
156 bool found_end_pos{
false};
157 bool in_quote{
false};
160 CHECK(file !=
nullptr || file_reader !=
nullptr);
162 while (!found_end_pos) {
168 buffer_first_row_index,
171 found_end_pos =
true;
179 offset = buffer_size;
187 template <
typename T>
190 const char* entire_buf_end,
192 const bool* is_array,
194 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
195 bool& try_single_thread,
196 bool filter_empty_lines) {
197 const char*
field = buf;
199 bool in_quote =
false;
200 bool in_array =
false;
201 bool has_escape =
false;
202 bool strip_quotes =
false;
203 try_single_thread =
false;
204 for (p = buf; p < entire_buf_end; ++p) {
205 if (*p == copy_params.
escape && p < entire_buf_end - 1 &&
206 *(p + 1) == copy_params.
quote) {
209 }
else if (copy_params.
quoted && *p == copy_params.
quote) {
210 in_quote = !in_quote;
214 }
else if (!in_quote && is_array !=
nullptr && *p == copy_params.
array_begin &&
215 is_array[row.size()]) {
217 while (p < entire_buf_end - 1) {
226 if (!has_escape && !strip_quotes) {
227 const char* field_end = p;
231 row.emplace_back(field, field_end - field);
233 tmp_buffers.emplace_back(std::make_unique<
char[]>(p - field + 1));
234 auto field_buf = tmp_buffers.back().get();
236 for (; i < p -
field; i++, j++) {
237 if (has_escape && field[i] == copy_params.
escape &&
238 field[i + 1] == copy_params.
quote) {
239 field_buf[j] = copy_params.
quote;
242 field_buf[j] = field[i];
245 const char* field_begin = field_buf;
246 const char* field_end = field_buf + j;
251 row.emplace_back(field_begin, field_end - field_begin);
255 strip_quotes =
false;
257 if (
is_eol(*p, copy_params)) {
259 if (filter_empty_lines) {
260 while (p + 1 < buf_end &&
is_eol(*(p + 1), copy_params)) {
265 if (p + 1 < buf_end && *p ==
'\r' && *(p + 1) ==
'\n') {
279 try_single_thread =
true;
283 try_single_thread =
true;
288 template const char*
get_row(
const char* buf,
290 const char* entire_buf_end,
292 const bool* is_array,
293 std::vector<std::string>& row,
294 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
295 bool& try_single_thread,
296 bool filter_empty_lines);
298 template const char*
get_row(
const char* buf,
300 const char* entire_buf_end,
302 const bool* is_array,
303 std::vector<std::string_view>& row,
304 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
305 bool& try_single_thread,
306 bool filter_empty_lines);
310 std::vector<std::string>& string_vec,
311 bool truncate_values) {
312 if (s == copy_params.
null_str || s ==
"NULL" || s.size() < 1 || s.empty()) {
316 throw std::runtime_error(
"Malformed Array :" + s);
319 std::string row(s.c_str() + 1, s.length() - 2);
325 bool try_single_thread =
false;
328 std::vector<std::unique_ptr<char[]>> tmp_buffers;
330 row.c_str() + row.length(),
331 row.c_str() + row.length(),
339 for (
size_t i = 0; i < string_vec.size(); ++i) {
341 if (truncate_values) {
344 throw std::runtime_error(
"Array String too long : " + string_vec[i] +
" max is " +
351 for (
auto& value : string_vec) {
352 if (value == copy_params.
null_str || value ==
"NULL" || value.empty()) {
364 auto old_buffer = std::move(buffer);
365 alloc_size = std::min(max_buffer_resize, alloc_size * 2);
366 LOG(
INFO) <<
"Setting import thread buffer allocation size to " << alloc_size
368 buffer = std::make_unique<char[]>(alloc_size);
370 memcpy(buffer.get(), old_buffer.get(), buffer_size);
372 CHECK(file !=
nullptr || file_reader !=
nullptr);
373 if (file !=
nullptr) {
374 fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
376 fread_size = file_reader->
read(buffer.get() + buffer_size, alloc_size - buffer_size);
378 buffer_size += fread_size;
bool is_eol(const char &c, const import_export::CopyParams ©_params)
virtual bool isScanFinished() const =0
virtual size_t read(void *buffer, size_t max_size)=0
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams ©_params)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams ©_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
void trim_space(const char *&field_begin, const char *&field_end)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr size_t MAX_STRLEN
static size_t max_buffer_resize
static constexpr size_t max_import_buffer_resize_byte_size
static const std::string trim_space(const char *field, const size_t len)