21 namespace foreign_storage {
27 std::set<int> column_filter_set,
28 const std::string& full_path,
30 const bool track_rejected_rows)
31 : buffer_size(buffer_size)
32 , buffer_alloc_size(buffer_size)
33 , copy_params(copy_params)
36 , render_group_analyzer_map(render_group_analyzer_map)
37 , full_path(full_path)
38 , track_rejected_rows(track_rejected_rows) {
39 if (buffer_size > 0) {
44 if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
48 if (column->columnType.is_dict_encoded_string() ||
49 (column->columnType.is_array() &&
IS_STRING(column->columnType.get_subtype()) &&
51 auto dict_descriptor =
52 getCatalog()->getMetadataForDict(column->columnType.get_comp_param(),
true);
53 string_dictionary = dict_descriptor->stringDict.get();
56 std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
62 const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
64 std::map<int, DataBlockPtr>
result;
65 std::vector<std::pair<const size_t, std::future<int8_t*>>>
66 encoded_data_block_ptrs_futures;
68 for (
const auto& import_buffer : import_buffers) {
69 if (import_buffer ==
nullptr) {
73 if (import_buffer->getTypeInfo().is_number() ||
74 import_buffer->getTypeInfo().is_time() ||
75 import_buffer->getTypeInfo().get_type() ==
kBOOLEAN) {
77 }
else if (import_buffer->getTypeInfo().is_string()) {
78 auto string_payload_ptr = import_buffer->getStringBuffer();
79 if (import_buffer->getTypeInfo().get_compression() ==
kENCODING_NONE) {
85 auto column_id = import_buffer->getColumnDesc()->columnId;
86 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
89 import_buffer->addDictEncodedString(*string_payload_ptr);
90 return import_buffer->getStringDictBuffer();
93 }
else if (import_buffer->getTypeInfo().is_geometry()) {
94 auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
97 CHECK(import_buffer->getTypeInfo().get_type() ==
kARRAY);
98 if (
IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
100 import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101 p.
arraysPtr = import_buffer->getStringArrayDictBuffer();
103 p.
arraysPtr = import_buffer->getArrayBuffer();
106 result[import_buffer->getColumnDesc()->columnId] = p;
110 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
111 result[encoded_ptr_future.first].
numbersPtr = encoded_ptr_future.second.get();
118 return datum.size() > 0 && (datum[0] ==
'.' || isdigit(datum[0]) || datum[0] ==
'-') &&
119 datum.find_first_of(
"ABCDEFabcdef") == std::string_view::npos;
126 const std::string_view lat_str,
128 std::vector<double>& coords,
129 const bool is_lon_lat_order) {
130 double lon = std::atof(std::string(lon_str).c_str());
134 lat = std::atof(std::string(lat_str).c_str());
138 if (!is_lon_lat_order) {
148 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
154 if (!pt.transform(ti)) {
161 coords.push_back(lon);
162 coords.push_back(lat);
168 const std::list<const ColumnDescriptor*>& columns,
169 std::list<const ColumnDescriptor*>::iterator& cd_it,
170 const size_t starting_col_idx,
172 size_t col_idx = starting_col_idx;
174 for (; cd_it != columns.end(); cd_it++) {
176 const auto& col_ti = cd->columnType;
177 if (col_ti.is_geometry()) {
186 col_idx += col_ti.get_physical_cols();
189 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
206 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
210 std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
216 import_buffers[col_idx]->add_value(cd, copy_params.
null_str,
true, copy_params);
219 std::vector<double> coords;
220 std::vector<double> bounds;
221 std::vector<int> ring_sizes;
222 std::vector<int> poly_rings;
223 int render_group = 0;
243 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
246 std::list<const ColumnDescriptor*>::iterator& cd_it,
247 std::vector<std::string_view>& row,
250 size_t first_row_index,
251 size_t row_index_plus_one,
252 std::shared_ptr<Catalog_Namespace::Catalog> catalog,
255 auto col_ti = cd->columnType;
256 SQLTypes col_type = col_ti.get_type();
259 auto starting_col_idx = col_idx;
261 auto const& geo_string = row[import_idx];
265 std::vector<double> coords;
266 std::vector<double> bounds;
267 std::vector<int> ring_sizes;
268 std::vector<int> poly_rings;
269 int render_group = 0;
273 if (import_ti.get_output_srid() == 4326) {
277 import_ti.set_input_srid(srid0);
283 geo_string, row[import_idx], import_ti, coords, copy_params.
lonlat)) {
284 throw std::runtime_error(
"Cannot read lon/lat to insert into POINT column " +
289 if (is_null || geo_string.empty() || geo_string ==
"NULL") {
306 std::string msg =
"Failed to extract valid geometry from row " +
308 " for column " + cd->columnName;
309 throw std::runtime_error(msg);
313 if (col_type != import_ti.get_type()) {
317 throw std::runtime_error(
"Imported geometry doesn't match the type of column " +
323 if (
IS_GEO_POLY(col_type) && render_group_analyzer_map &&
324 render_group_analyzer_map->size()) {
325 auto const itr = render_group_analyzer_map->find(cd->columnId);
326 if (itr != render_group_analyzer_map->end()) {
327 auto& render_group_analyzer = *itr->second;
328 render_group = render_group_analyzer.insertBoundsAndReturnRenderGroup(bounds);
335 if (is_null && col_ti.get_notnull()) {
336 throw std::runtime_error(
"NULL value provided for column (" + cd->columnName +
337 ") with NOT NULL constraint.");
352 import_buffers[starting_col_idx]->add_value(
353 cd, copy_params.
null_str,
true, copy_params);
358 const std::string& null_indicator) {
359 bool is_null = (datum == null_indicator);
367 throw std::runtime_error(
"NULL value provided for column (" + column->
columnName +
368 ") with NOT NULL constraint.");
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
void getColumns(std::vector< double > &coords) const
constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
ParseBufferRequest(const ParseBufferRequest &request)=delete
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
HOST DEVICE SQLTypes get_type() const
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
static bool isCoordinateScalar(const std::string_view datum)
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
HOST DEVICE bool get_notnull() const
std::unique_ptr< char[]> buffer
DEVICE void swap(ARGS &&...args)