24 namespace foreign_storage {
32 std::unique_ptr<
bool[]>& array_flags,
35 const std::list<const ColumnDescriptor*>& columns) {
36 array_flags = std::unique_ptr<bool[]>(
new bool[columns.size()]);
38 for (
const auto cd : columns) {
39 const auto& col_ti = cd->columnType;
40 phys_cols += col_ti.get_physical_cols();
41 if (cd->columnType.get_type() ==
kPOINT) {
45 if (cd->columnType.get_type() ==
kARRAY) {
46 array_flags.get()[i] =
true;
48 array_flags.get()[i] =
false;
57 const std::string& file_name) {
59 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
65 const std::string& option_name) {
66 if (
auto it = foreign_table->
options.find(option_name);
67 it != foreign_table->
options.end()) {
68 if (it->second.length() == 1) {
71 if (it->second == std::string(
"\\n")) {
73 }
else if (it->second == std::string(
"\\t")) {
76 throw std::runtime_error{
"Invalid value specified for option \"" + option_name +
77 "\". Expected a single character, \"\\n\" or \"\\t\"."};
85 const std::string& option_name,
86 const size_t expected_num_chars) {
87 if (
auto it = foreign_table->
options.find(option_name);
88 it != foreign_table->
options.end()) {
89 if (it->second.length() != expected_num_chars) {
90 throw std::runtime_error{
"Value of \"" + option_name +
91 "\" foreign table option has the wrong number of "
92 "characters. Expected " +
101 const std::string& option_name) {
102 if (
auto it = foreign_table->
options.find(option_name);
103 it != foreign_table->
options.end()) {
104 if (boost::iequals(it->second,
"TRUE")) {
106 }
else if (boost::iequals(it->second,
"FALSE")) {
109 throw std::runtime_error{
"Invalid boolean value specified for \"" + option_name +
110 "\" foreign table option. "
111 "Value must be either 'true' or 'false'."};
123 bool convert_data_blocks,
124 bool columns_are_pre_filtered,
125 bool skip_dict_encoding)
const {
129 const char* thread_buf = request.
buffer.get() + request.
begin_pos + begin;
130 const char* thread_buf_end = request.
buffer.get() + request.
end_pos;
133 std::vector<std::string_view> row;
134 size_t row_index_plus_one = 0;
135 const char* p = thread_buf;
136 bool try_single_thread =
false;
139 std::unique_ptr<bool[]> array_flags;
146 auto num_cols = request.
getColumns().size() - phys_cols;
147 if (columns_are_pre_filtered) {
148 for (
size_t col_idx = 0; col_idx < request.
getColumns().size(); ++col_idx) {
155 size_t current_row_id = 0;
156 size_t row_count = 0;
158 std::vector<size_t> row_offsets{};
164 for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
166 current_row_id = row_count;
168 std::vector<std::unique_ptr<char[]>>
170 const char* line_start = p;
179 !columns_are_pre_filtered);
181 row_index_plus_one++;
183 bool incorrect_column_count =
false;
188 result.rejected_rows.insert(current_row_id);
189 incorrect_column_count =
true;
195 size_t import_idx = 0;
200 if (incorrect_column_count) {
201 auto cd_it = columns.begin();
205 for (
auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
207 const auto& col_ti = cd->columnType;
208 bool column_is_present =
210 CHECK(row.size() > import_idx || !column_is_present);
213 is_null = column_is_present
216 }
catch (
const std::exception& e) {
218 result.rejected_rows.insert(current_row_id);
227 row[import_idx] =
sv_strip(row[import_idx]);
229 if (col_ti.is_geometry()) {
231 auto starting_col_idx = col_idx;
244 }
catch (
const std::exception& e) {
246 result.rejected_rows.insert(current_row_id);
255 if (!is_null && cd->columnType ==
kPOINT &&
257 if (!columns_are_pre_filtered) {
261 if (!columns_are_pre_filtered) {
265 col_idx += col_ti.get_physical_cols();
268 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
275 cd, row[import_idx], is_null, request.
copy_params);
276 }
catch (
const std::exception& e) {
278 result.rejected_rows.insert(current_row_id);
286 if (column_is_present) {
292 }
catch (
const std::exception& e) {
294 "\" in row \"" + std::string(line_start, p) +
295 "\" in file \"" + file_path +
"\"");
300 result.row_offsets = row_offsets;
301 result.row_count = row_count;
302 if (convert_data_blocks) {
303 result.column_id_to_data_blocks_map =
313 const std::string& row,
317 const std::string& file_name)
const {
318 bool is_array =
false;
319 bool try_single_thread =
false;
320 std::vector<std::unique_ptr<char[]>> tmp_buffers;
321 std::vector<std::string_view> fields;
324 row.c_str() + row.size(),
325 row.c_str() + row.size(),
343 copy_params.delimiter = value[0];
346 it != foreign_table->
options.end()) {
347 copy_params.null_str = it->second;
350 if (has_header.has_value()) {
351 if (has_header.value()) {
359 if (
const auto& value =
362 copy_params.quote = value[0];
364 if (
const auto& value =
367 copy_params.escape = value[0];
371 copy_params.line_delim = value[0];
373 if (
const auto& value =
376 copy_params.array_delim = value[0];
378 if (
const auto& value =
381 copy_params.array_begin = value[0];
382 copy_params.array_end = value[1];
386 copy_params.geo_assign_render_groups =
388 .value_or(copy_params.geo_assign_render_groups);
389 copy_params.geo_explode_collections =
391 .value_or(copy_params.geo_explode_collections);
393 it != foreign_table->
options.end()) {
394 copy_params.source_srid = std::stoi(it->second);
398 it != foreign_table->
options.end()) {
399 copy_params.buffer_size = std::stoi(it->second);
402 it != foreign_table->
options.end()) {
403 copy_params.threads = std::stoi(it->second);
406 .value_or(copy_params.trim_spaces);
413 std::unique_ptr<
char[]>& buffer,
416 const size_t buffer_first_row_index,
417 unsigned int& num_rows_in_buffer,
423 buffer_first_row_index,
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string BUFFER_SIZE_KEY
static const std::string HEADER_KEY
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
static const std::string NULLS_KEY
const import_export::CopyParams copy_params
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
const bool track_rejected_rows
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
static const std::string LONLAT_KEY
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
static const std::string THREADS_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams ©_params, size_t num_cols, int point_cols, const std::string &file_name) const
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
static const std::string QUOTED_KEY
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
std::string getFilePath() const
static const std::string QUOTE_KEY
std::unique_ptr< char[]> buffer
static const std::string ESCAPE_KEY
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override