22 namespace foreign_storage {
23 namespace csv_file_buffer_parser {
26 std::unique_ptr<
bool[]>& array_flags,
29 const std::list<const ColumnDescriptor*>& columns) {
30 array_flags = std::unique_ptr<bool[]>(
new bool[columns.size()]);
32 for (
const auto cd : columns) {
33 const auto& col_ti = cd->columnType;
34 phys_cols += col_ti.get_physical_cols();
35 if (cd->columnType.get_type() ==
kPOINT) {
39 if (cd->columnType.get_type() ==
kARRAY) {
40 array_flags.get()[
i] =
true;
42 array_flags.get()[
i] =
false;
51 const std::string& file_name) {
53 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
60 return datum.size() > 0 && (datum[0] ==
'.' || isdigit(datum[0]) || datum[0] ==
'-') &&
61 datum.find_first_of(
"ABCDEFabcdef") == std::string_view::npos;
65 const std::string_view lat_str,
66 std::vector<double>& coords,
67 const bool is_lon_lat_order) {
68 double lon = std::atof(std::string(lon_str).c_str());
72 lat = std::atof(std::string(lat_str).c_str());
76 if (!is_lon_lat_order) {
86 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
89 coords.push_back(lon);
90 coords.push_back(lat);
96 const std::string& null_indicator) {
97 bool is_null = (datum == null_indicator);
105 throw std::runtime_error(
"NULL value provided for column (" + column->
columnName +
106 ") with NOT NULL constraint.");
112 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
115 std::list<const ColumnDescriptor*>::iterator& cd_it,
116 std::vector<std::string_view>& row,
119 size_t first_row_index,
120 size_t row_index_plus_one,
121 std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
123 auto col_ti = cd->columnType;
124 SQLTypes col_type = col_ti.get_type();
128 import_buffers[col_idx]->add_value(cd, copy_params.
null_str,
true, copy_params);
130 auto const& geo_string = row[import_idx];
134 std::vector<double> coords;
135 std::vector<double> bounds;
136 std::vector<int> ring_sizes;
137 std::vector<int> poly_rings;
138 int render_group = 0;
142 geo_string, row[import_idx], coords, copy_params.
lonlat)) {
143 throw std::runtime_error(
"Cannot read lon/lat to insert into POINT column " +
165 std::string msg =
"Failed to extract valid geometry from row " +
167 " for column " + cd->columnName;
168 throw std::runtime_error(msg);
172 if (col_type != import_ti.get_type()) {
176 throw std::runtime_error(
"Imported geometry doesn't match the type of column " +
196 const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
198 std::map<int, DataBlockPtr>
result;
199 std::vector<std::pair<const size_t, std::future<int8_t*>>>
200 encoded_data_block_ptrs_futures;
202 for (
const auto& import_buffer : import_buffers) {
203 if (import_buffer ==
nullptr)
206 if (import_buffer->getTypeInfo().is_number() ||
207 import_buffer->getTypeInfo().is_time() ||
208 import_buffer->getTypeInfo().get_type() ==
kBOOLEAN) {
210 }
else if (import_buffer->getTypeInfo().is_string()) {
211 auto string_payload_ptr = import_buffer->getStringBuffer();
212 if (import_buffer->getTypeInfo().get_compression() ==
kENCODING_NONE) {
218 auto column_id = import_buffer->getColumnDesc()->columnId;
219 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
221 std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
222 import_buffer->addDictEncodedString(*string_payload_ptr);
223 return import_buffer->getStringDictBuffer();
226 }
else if (import_buffer->getTypeInfo().is_geometry()) {
227 auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
230 CHECK(import_buffer->getTypeInfo().get_type() ==
kARRAY);
231 if (
IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
233 import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
234 p.
arraysPtr = import_buffer->getStringArrayDictBuffer();
236 p.
arraysPtr = import_buffer->getArrayBuffer();
239 result[import_buffer->getColumnDesc()->columnId] = p;
243 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
244 result[encoded_ptr_future.first].
numbersPtr = encoded_ptr_future.second.get();
253 std::set<int> column_filter_set,
254 const std::string& full_path)
255 : buffer_size(buffer_size)
256 , buffer_alloc_size(buffer_size)
257 , copy_params(copy_params)
260 , full_path(full_path) {
261 if (buffer_size > 0) {
266 if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
270 if (column->columnType.is_dict_encoded_string() ||
271 (column->columnType.is_array() &&
IS_STRING(column->columnType.get_subtype()) &&
273 auto dict_descriptor =
getCatalog()->getMetadataForDictUnlocked(
274 column->columnType.get_comp_param(),
true);
275 string_dictionary = dict_descriptor->stringDict.get();
278 std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
288 bool convert_data_blocks,
289 bool columns_are_pre_filtered) {
293 const char* thread_buf = request.
buffer.get() + request.
begin_pos + begin;
294 const char* thread_buf_end = request.
buffer.get() + request.
end_pos;
297 std::vector<std::string_view> row;
298 size_t row_index_plus_one = 0;
299 const char* p = thread_buf;
300 bool try_single_thread =
false;
303 std::unique_ptr<bool[]> array_flags;
306 array_flags, phys_cols, point_cols, request.
getColumns());
307 auto num_cols = request.
getColumns().size() - phys_cols;
308 if (columns_are_pre_filtered) {
309 for (
size_t col_idx = 0; col_idx < request.
getColumns().size(); ++col_idx) {
316 size_t row_count = 0;
318 std::vector<size_t> row_offsets{};
322 for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
325 std::vector<std::unique_ptr<char[]>>
327 const char* line_start = p;
336 !columns_are_pre_filtered);
338 row_index_plus_one++;
341 size_t import_idx = 0;
345 for (
auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
347 const auto& col_ti = cd->columnType;
348 bool column_is_present =
350 CHECK(row.size() > import_idx || !column_is_present);
356 if (col_ti.is_geometry()) {
370 if (!is_null && cd->columnType ==
kPOINT &&
372 if (!columns_are_pre_filtered)
375 if (!columns_are_pre_filtered)
378 col_idx += col_ti.get_physical_cols();
381 for (
int i = 0;
i < cd->columnType.get_physical_cols(); ++
i) {
387 cd, row[import_idx], is_null, request.
copy_params);
389 if (column_is_present) {
395 }
catch (
const std::exception& e) {
397 "\" in row \"" + std::string(line_start, p) +
398 "\" in file \"" + file_path +
"\"");
404 result.row_offsets = row_offsets;
405 result.row_count = row_count;
406 if (convert_data_blocks) {
407 result.column_id_to_data_blocks_map =
417 const std::string& row,
421 const std::string& file_name) {
422 bool is_array =
false;
423 bool try_single_thread =
false;
424 std::vector<std::unique_ptr<char[]>> tmp_buffers;
425 std::vector<std::string_view> fields;
428 row.c_str() + row.size(),
429 row.c_str() + row.size(),
const import_export::CopyParams copy_params
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
std::vector< std::string > * stringsPtr
bool is_coordinate_scalar(const std::string_view datum)
std::vector< ArrayDatum > * arraysPtr
std::list< const ColumnDescriptor * > getColumns() const
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
CONSTEXPR DEVICE bool is_null(const T &value)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
void parse_and_validate_expected_column_count(const std::string &row, const import_export::CopyParams ©_params, size_t num_cols, int point_cols, const std::string &file_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered)
specifies the content in-memory of a row in the column metadata table
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
ParseBufferRequest(const ParseBufferRequest &request)=delete
std::unique_ptr< char[]> buffer
HOST DEVICE bool get_notnull() const
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
std::string getFilePath() const
DEVICE void swap(ARGS &&...args)