23 namespace foreign_storage {
33 while (i >= static_cast<int64_t>(start)) {
34 if (buffer[i] == line_delim) {
41 "Unable to find an end of line character after reading " +
48 const boost::regex& line_start_regex) {
49 return boost::regex_search(std::string{buffer + start, end - start + 1},
51 boost::regex_constants::match_continuous);
57 if (it != foreign_table->
options.end()) {
74 const char* buffer_end,
76 const std::optional<boost::regex>& line_start_regex) {
78 bool row_found{
false};
79 while (!row_found && row_end <= buffer_end) {
80 if (*row_end == line_delim) {
81 if (row_end == buffer_end) {
83 }
else if (line_start_regex.has_value()) {
87 <<
"'" << line_start_regex.value() <<
"' not found in: '"
88 << std::string{curr, row_end - curr + 1ULL} <<
"'";
89 auto row_str =
get_next_row(row_end + 1, buffer_end, line_delim, {});
91 row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
92 row_end += row_str.length() + 1;
93 if (row_end == buffer_end) {
96 row_str =
get_next_row(row_end + 1, buffer_end, line_delim, {});
106 return std::string{curr,
static_cast<size_t>(row_end - curr - 1)};
113 const std::optional<boost::regex>& line_start_regex) {
115 auto buffer_end = buffer + end;
116 auto curr = buffer + start;
117 while (curr <= buffer_end) {
118 auto row_str =
get_next_row(curr, buffer_end, line_delim, line_start_regex);
119 curr += row_str.length() + 1;
126 const boost::regex& line_regex,
127 size_t logical_column_count,
128 std::vector<std::string>& parsed_columns_str,
129 std::vector<std::string_view>& parsed_columns_sv,
130 const std::string& file_path) {
131 parsed_columns_str.clear();
132 parsed_columns_sv.clear();
134 bool set_all_nulls{
false};
135 if (boost::regex_match(row_str, match, line_regex)) {
136 auto matched_column_count = match.size() - 1;
137 if (logical_column_count != matched_column_count) {
139 logical_column_count, matched_column_count, file_path);
141 CHECK_GT(match.size(),
static_cast<size_t>(1));
142 for (
size_t i = 1; i < match.size(); i++) {
143 parsed_columns_str.emplace_back(match[i].str());
144 parsed_columns_sv.emplace_back(parsed_columns_str.back());
148 std::vector<std::string_view>(logical_column_count, std::string_view{});
149 set_all_nulls =
true;
151 return set_all_nulls;
155 const std::string& option_name) {
156 if (
auto it = foreign_table->
options.find(option_name);
157 it != foreign_table->
options.end()) {
158 if (boost::iequals(it->second,
"TRUE")) {
160 }
else if (boost::iequals(it->second,
"FALSE")) {
163 throw std::runtime_error{
"Invalid boolean value specified for \"" + option_name +
164 "\" foreign table option. "
165 "Value must be either 'true' or 'false'."};
182 bool convert_data_blocks,
183 bool columns_are_pre_filtered)
const {
186 const char* buffer_end = request.
buffer.get() + request.
end_pos;
188 std::vector<size_t> row_offsets;
191 size_t current_row_id = 0;
192 size_t row_count = 0;
194 std::vector<std::string> parsed_columns_str;
195 parsed_columns_str.reserve(logical_column_count);
196 std::vector<std::string_view> parsed_columns_sv;
197 parsed_columns_sv.reserve(logical_column_count);
203 auto curr = buffer_start;
204 while (curr < buffer_end && remaining_row_count > 0) {
208 curr += row_str.length() + 1;
209 current_row_id = row_count++;
210 remaining_row_count--;
212 bool skip_all_columns =
215 [](
const auto& import_buffer) {
return !import_buffer; });
216 if (!skip_all_columns) {
219 bool set_all_nulls =
false;
223 logical_column_count,
229 result.rejected_rows.insert(current_row_id);
230 auto cd_it = columns.begin();
238 size_t parsed_column_index = 0;
239 size_t import_buffer_index = 0;
241 for (
auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
243 const auto& column_type = cd->columnType;
248 (set_all_nulls ||
isNullDatum(parsed_columns_sv[parsed_column_index],
251 }
catch (
const std::exception& e) {
253 result.rejected_rows.insert(current_row_id);
255 columns, cd_it, import_buffer_index, request);
261 if (column_type.is_geometry()) {
262 auto starting_import_buffer_index = import_buffer_index;
275 }
catch (
const std::exception& e) {
277 result.rejected_rows.insert(current_row_id);
279 columns, cd_it, starting_import_buffer_index, request);
286 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
293 parsed_columns_sv[parsed_column_index],
296 }
catch (
const std::exception& e) {
298 result.rejected_rows.insert(current_row_id);
300 columns, cd_it, import_buffer_index, request);
306 parsed_column_index++;
307 import_buffer_index++;
311 for (
int i = 0; i < column_type.get_physical_cols(); i++) {
312 import_buffer_index++;
315 parsed_column_index++;
316 import_buffer_index++;
322 }
catch (
const std::exception& e) {
324 "\" in row \"" + row_str +
"\" in file \"" +
330 result.row_offsets = row_offsets;
331 result.row_count = row_count;
332 if (convert_data_blocks) {
333 result.column_id_to_data_blocks_map =
344 if (has_header.has_value()) {
345 if (has_header.value()) {
352 it != foreign_table->
options.end()) {
353 copy_params.buffer_size = std::stoi(it->second);
356 it != foreign_table->
options.end()) {
357 copy_params.threads = std::stoi(it->second);
364 std::unique_ptr<
char[]>& buffer,
367 const size_t buffer_first_row_index,
368 unsigned int& num_rows_in_buffer,
370 CHECK_GT(buffer_size, static_cast<size_t>(0));
372 size_t end_pos = buffer_size - 1;
373 bool found_end_pos{
false};
374 while (!found_end_pos) {
377 buffer.get(), buffer_size, start_pos, end_pos, copy_params.
line_delim);
380 found_end_pos =
true;
387 CHECK_GT(end_pos, static_cast<size_t>(0));
388 auto old_end_pos = end_pos;
396 old_end_pos = end_pos;
403 found_end_pos =
true;
405 found_end_pos =
true;
407 }
catch (InsufficientBufferSizeException& e) {
412 start_pos = buffer_size;
415 end_pos = buffer_size - 1;
418 CHECK(found_end_pos);
430 for (
const auto& [file_path,
line] : first_line_by_file_path) {
434 CHECK(line_start_regex.has_value());
436 "\" does not match line start regex \"" +
437 line_start_regex.value() +
"\""};
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string BUFFER_SIZE_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static const std::string LINE_REGEX_KEY
static const std::string HEADER_KEY
RegexFileBufferParser(const ForeignTable *foreign_table)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
const bool track_rejected_rows
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
static size_t max_buffer_resize_
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
bool regex_match_columns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path)
virtual bool isScanFinished()=0
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
std::optional< boost::regex > line_start_regex_
static const std::string THREADS_KEY
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
static size_t getMaxBufferResize()
static const std::string LINE_START_REGEX_KEY
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex)
static size_t max_buffer_resize
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
std::string getFilePath() const
std::unique_ptr< char[]> buffer
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override