25 #include <string_view>
32 return c == copy_params.
line_delim || c ==
'\n' || c ==
'\r';
35 inline void trim_space(
const char*& field_begin,
const char*& field_end) {
36 while (field_begin < field_end && (*field_begin ==
' ' || *field_begin ==
'\r')) {
39 while (field_begin < field_end &&
40 (*(field_end - 1) ==
' ' || *(field_end - 1) ==
'\r')) {
46 const char*& field_end,
48 if (copy_params.
quoted && field_end - field_begin > 0 &&
49 *field_begin == copy_params.
quote) {
52 if (copy_params.
quoted && field_end - field_begin > 0 &&
53 *(field_end - 1) == copy_params.
quote) {
59 namespace import_export {
60 namespace delimited_parser {
66 if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.
line_delim)) {
70 const char* buf = buffer + begin;
71 for (i = 0; i < end - begin; i++) {
82 unsigned int& num_rows_this_buffer,
83 size_t buffer_first_row_index,
86 size_t last_line_delim_pos = 0;
87 const char* current = buffer + offset;
89 while (current < buffer + size) {
90 while (!in_quote && current < buffer + size) {
93 last_line_delim_pos = current - buffer;
94 ++num_rows_this_buffer;
95 }
else if (*current == copy_params.
quote) {
101 while (in_quote && current < buffer + size) {
103 if ((*current == copy_params.
escape) && (current < buffer + size - 1) &&
104 (*(current + 1) == copy_params.
quote)) {
106 }
else if (*current == copy_params.
quote) {
113 while (current < buffer + size) {
115 last_line_delim_pos = current - buffer;
116 ++num_rows_this_buffer;
122 if (last_line_delim_pos <= 0) {
123 size_t excerpt_length = std::min<size_t>(50, size);
124 std::string buffer_excerpt{buffer, buffer + excerpt_length};
125 std::string error_message =
126 "Unable to find an end of line character after reading " +
std::to_string(size) +
127 " characters. Please ensure that the correct \"line_delimiter\" option is "
128 "specified or update the \"buffer_size\" option appropriately. Row number: " +
130 ". First few characters in row: " + buffer_excerpt;
134 return last_line_delim_pos + 1;
148 std::unique_ptr<
char[]>& buffer,
151 const size_t buffer_first_row_index,
152 unsigned int& num_rows_in_buffer,
155 bool found_end_pos{
false};
156 bool in_quote{
false};
159 CHECK(file !=
nullptr || csv_reader !=
nullptr);
161 while (!found_end_pos) {
167 buffer_first_row_index,
170 found_end_pos =
true;
178 auto old_buffer = std::move(buffer);
180 LOG(
INFO) <<
"Setting import thread buffer allocation size to " << alloc_size
182 buffer = std::make_unique<char[]>(alloc_size);
184 memcpy(buffer.get(), old_buffer.get(), buffer_size);
186 if (file !=
nullptr) {
187 fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
190 csv_reader->
read(buffer.get() + buffer_size, alloc_size - buffer_size);
192 offset = buffer_size;
193 buffer_size += fread_size;
199 template <
typename T>
202 const char* entire_buf_end,
204 const bool* is_array,
206 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
207 bool& try_single_thread) {
208 const char*
field = buf;
210 bool in_quote =
false;
211 bool in_array =
false;
212 bool has_escape =
false;
213 bool strip_quotes =
false;
214 try_single_thread =
false;
215 for (p = buf; p < entire_buf_end; ++p) {
216 if (*p == copy_params.
escape && p < entire_buf_end - 1 &&
217 *(p + 1) == copy_params.
quote) {
220 }
else if (copy_params.
quoted && *p == copy_params.
quote) {
221 in_quote = !in_quote;
225 }
else if (!in_quote && is_array !=
nullptr && *p == copy_params.
array_begin &&
226 is_array[row.size()]) {
228 while (p < entire_buf_end - 1) {
237 if (!has_escape && !strip_quotes) {
238 const char* field_end = p;
240 row.emplace_back(field, field_end - field);
242 tmp_buffers.emplace_back(std::make_unique<
char[]>(p - field + 1));
243 auto field_buf = tmp_buffers.back().get();
245 for (; i < p -
field; i++, j++) {
246 if (has_escape && field[i] == copy_params.
escape &&
247 field[i + 1] == copy_params.
quote) {
248 field_buf[j] = copy_params.
quote;
251 field_buf[j] = field[i];
254 const char* field_begin = field_buf;
255 const char* field_end = field_buf + j;
258 row.emplace_back(field_begin, field_end - field_begin);
262 strip_quotes =
false;
264 if (
is_eol(*p, copy_params)) {
266 while (p + 1 < buf_end &&
is_eol(*(p + 1), copy_params)) {
279 try_single_thread =
true;
283 try_single_thread =
true;
288 template const char*
get_row(
const char* buf,
290 const char* entire_buf_end,
292 const bool* is_array,
293 std::vector<std::string>& row,
294 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
295 bool& try_single_thread);
297 template const char*
get_row(
const char* buf,
299 const char* entire_buf_end,
301 const bool* is_array,
302 std::vector<std::string_view>& row,
303 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
304 bool& try_single_thread);
308 std::vector<std::string>& string_vec) {
309 if (s == copy_params.
null_str || s ==
"NULL" || s.size() < 1 || s.empty()) {
312 string_vec.emplace_back(
"NULL");
316 throw std::runtime_error(
"Malformed Array :" + s);
319 std::string row(s.c_str() + 1, s.length() - 2);
321 bool try_single_thread =
false;
324 std::vector<std::unique_ptr<char[]>> tmp_buffers;
326 row.c_str() + row.length(),
327 row.c_str() + row.length(),
334 for (
size_t i = 0; i < string_vec.size(); ++i) {
335 if (string_vec[i].empty()) {
336 string_vec.erase(string_vec.begin() + i);
339 throw std::runtime_error(
"Array String too long : " + string_vec[i] +
" max is " +
bool is_eol(const char &c, const import_export::CopyParams ©_params)
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams ©_params)
virtual size_t read(void *buffer, size_t max_size)=0
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams ©_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void trim_space(const char *&field_begin, const char *&field_end)
virtual bool isScanFinished()=0
static constexpr size_t MAX_STRLEN
static size_t max_buffer_resize
static const std::string trim_space(const char *field, const size_t len)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.