21 namespace foreign_storage {
28 std::set<int> column_filter_set,
29 const std::string& full_path,
31 const bool track_rejected_rows)
32 : buffer_size(buffer_size)
33 , buffer_alloc_size(buffer_size)
34 , copy_params(copy_params)
37 , render_group_analyzer_map(render_group_analyzer_map)
38 , full_path(full_path)
39 , track_rejected_rows(track_rejected_rows) {
40 if (buffer_size > 0) {
45 if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
49 if (column->columnType.is_dict_encoded_string() ||
50 (column->columnType.is_array() &&
IS_STRING(column->columnType.get_subtype()) &&
52 auto dict_descriptor =
53 getCatalog()->getMetadataForDict(column->columnType.get_comp_param(),
true);
54 string_dictionary = dict_descriptor->stringDict.get();
57 std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
63 const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
64 const bool skip_dict_encoding) {
65 std::map<int, DataBlockPtr>
result;
66 std::vector<std::pair<const size_t, std::future<int8_t*>>>
67 encoded_data_block_ptrs_futures;
69 for (
const auto& import_buffer : import_buffers) {
70 if (import_buffer ==
nullptr) {
74 if (import_buffer->getTypeInfo().is_number() ||
75 import_buffer->getTypeInfo().is_time() ||
76 import_buffer->getTypeInfo().get_type() ==
kBOOLEAN) {
78 }
else if (import_buffer->getTypeInfo().is_string()) {
79 auto string_payload_ptr = import_buffer->getStringBuffer();
80 if (import_buffer->getTypeInfo().get_compression() ==
kENCODING_NONE) {
86 if (!skip_dict_encoding) {
87 auto column_id = import_buffer->getColumnDesc()->columnId;
88 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
91 import_buffer->addDictEncodedString(*string_payload_ptr);
92 return import_buffer->getStringDictBuffer();
96 }
else if (import_buffer->getTypeInfo().is_geometry()) {
97 auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
100 CHECK(import_buffer->getTypeInfo().get_type() ==
kARRAY);
101 if (
IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
103 import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
104 p.
arraysPtr = import_buffer->getStringArrayDictBuffer();
106 p.
arraysPtr = import_buffer->getArrayBuffer();
109 result[import_buffer->getColumnDesc()->columnId] = p;
112 if (!skip_dict_encoding) {
114 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
115 encoded_ptr_future.second.wait();
117 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
118 result[encoded_ptr_future.first].
numbersPtr = encoded_ptr_future.second.get();
126 return datum.size() > 0 && (datum[0] ==
'.' || isdigit(datum[0]) || datum[0] ==
'-') &&
127 datum.find_first_of(
"ABCDEFabcdef") == std::string_view::npos;
134 const std::string_view lat_str,
136 std::vector<double>& coords,
137 const bool is_lon_lat_order) {
138 double lon = std::atof(std::string(lon_str).c_str());
142 lat = std::atof(std::string(lat_str).c_str());
146 if (!is_lon_lat_order) {
156 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
162 if (!pt.transform(ti)) {
169 coords.push_back(lon);
170 coords.push_back(lat);
176 const std::list<const ColumnDescriptor*>& columns,
177 std::list<const ColumnDescriptor*>::iterator& cd_it,
178 const size_t starting_col_idx,
180 size_t col_idx = starting_col_idx;
182 for (; cd_it != columns.end(); cd_it++) {
184 const auto& col_ti = cd->columnType;
185 if (col_ti.is_geometry()) {
194 col_idx += col_ti.get_physical_cols();
197 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
214 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
218 std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
224 import_buffers[col_idx]->add_value(cd, copy_params.
null_str,
true, copy_params);
227 std::vector<double> coords;
228 std::vector<double> bounds;
229 std::vector<int> ring_sizes;
230 std::vector<int> poly_rings;
231 int render_group = 0;
251 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
254 std::list<const ColumnDescriptor*>::iterator& cd_it,
255 std::vector<std::string_view>& row,
258 size_t first_row_index,
259 size_t row_index_plus_one,
260 std::shared_ptr<Catalog_Namespace::Catalog> catalog,
263 auto col_ti = cd->columnType;
264 SQLTypes col_type = col_ti.get_type();
267 auto starting_col_idx = col_idx;
269 auto const& geo_string = row[import_idx];
273 std::vector<double> coords;
274 std::vector<double> bounds;
275 std::vector<int> ring_sizes;
276 std::vector<int> poly_rings;
277 int render_group = 0;
281 if (import_ti.get_output_srid() == 4326) {
285 import_ti.set_input_srid(srid0);
291 geo_string, row[import_idx], import_ti, coords, copy_params.
lonlat)) {
292 throw std::runtime_error(
"Cannot read lon/lat to insert into POINT column " +
297 if (is_null || geo_string.empty() || geo_string ==
"NULL") {
314 std::string msg =
"Failed to extract valid geometry from row " +
316 " for column " + cd->columnName;
317 throw std::runtime_error(msg);
321 if (col_type != import_ti.get_type()) {
325 throw std::runtime_error(
"Imported geometry doesn't match the type of column " +
331 if (
IS_GEO_POLY(col_type) && render_group_analyzer_map &&
332 render_group_analyzer_map->size()) {
333 auto const itr = render_group_analyzer_map->find(cd->columnId);
334 if (itr != render_group_analyzer_map->end()) {
335 auto& render_group_analyzer = *itr->second;
336 render_group = render_group_analyzer.insertBoundsAndReturnRenderGroup(bounds);
343 if (is_null && col_ti.get_notnull()) {
344 throw std::runtime_error(
"NULL value provided for column (" + cd->columnName +
345 ") with NOT NULL constraint.");
360 import_buffers[starting_col_idx]->add_value(
361 cd, copy_params.
null_str,
true, copy_params);
366 const std::string& null_indicator) {
375 throw std::runtime_error(
"NULL value provided for column (" + column->
columnName +
376 ") with NOT NULL constraint.");
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
void getColumns(std::vector< double > &coords) const
constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
ParseBufferRequest(const ParseBufferRequest &request)=delete
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
HOST DEVICE SQLTypes get_type() const
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
static bool isCoordinateScalar(const std::string_view datum)
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
HOST DEVICE bool get_notnull() const
std::unique_ptr< char[]> buffer
DEVICE void swap(ARGS &&...args)