OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TextFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Geospatial/Types.h"
19 #include "Shared/import_helpers.h"
20 
21 namespace foreign_storage {
22 
24  const import_export::CopyParams& copy_params,
25  int db_id,
26  const ForeignTable* foreign_table,
27  std::set<int> column_filter_set,
28  const std::string& full_path,
29  const bool track_rejected_rows)
30  : buffer_size(buffer_size)
31  , buffer_alloc_size(buffer_size)
32  , copy_params(copy_params)
33  , db_id(db_id)
34  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
35  , full_path(full_path)
36  , track_rejected_rows(track_rejected_rows) {
37  if (buffer_size > 0) {
38  buffer = std::make_unique<char[]>(buffer_size);
39  }
40  // initialize import buffers from columns.
41  for (const auto column : getColumns()) {
42  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
43  import_buffers.emplace_back(nullptr);
44  } else {
45  StringDictionary* string_dictionary = nullptr;
46  if (column->columnType.is_dict_encoded_string() ||
47  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
48  column->columnType.get_compression() == kENCODING_DICT)) {
49  auto dict_descriptor =
50  getCatalog()->getMetadataForDict(column->columnType.get_comp_param(), true);
51  string_dictionary = dict_descriptor->stringDict.get();
52  }
53  import_buffers.emplace_back(
54  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
55  }
56  }
57 }
58 
60  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
61  const bool skip_dict_encoding) {
62  std::map<int, DataBlockPtr> result;
63  std::vector<std::pair<const size_t, std::future<int8_t*>>>
64  encoded_data_block_ptrs_futures;
65  // make all async calls to string dictionary here and then continue execution
66  for (const auto& import_buffer : import_buffers) {
67  if (import_buffer == nullptr) {
68  continue;
69  }
70  DataBlockPtr p;
71  if (import_buffer->getTypeInfo().is_number() ||
72  import_buffer->getTypeInfo().is_time() ||
73  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
74  p.numbersPtr = import_buffer->getAsBytes();
75  } else if (import_buffer->getTypeInfo().is_string()) {
76  auto string_payload_ptr = import_buffer->getStringBuffer();
77  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
78  p.stringsPtr = string_payload_ptr;
79  } else {
80  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
81  p.numbersPtr = nullptr;
82 
83  if (!skip_dict_encoding) {
84  auto column_id = import_buffer->getColumnDesc()->columnId;
85  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
86  column_id,
87  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
88  import_buffer->addDictEncodedString(*string_payload_ptr);
89  return import_buffer->getStringDictBuffer();
90  })));
91  }
92  }
93  } else if (import_buffer->getTypeInfo().is_geometry()) {
94  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
95  p.stringsPtr = geo_payload_ptr;
96  } else {
97  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
98  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
99  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
100  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
102  } else {
103  p.arraysPtr = import_buffer->getArrayBuffer();
104  }
105  }
106  result[import_buffer->getColumnDesc()->columnId] = p;
107  }
108 
109  if (!skip_dict_encoding) {
110  // wait for the async requests we made for string dictionary
111  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
112  encoded_ptr_future.second.wait();
113  }
114  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
115  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
116  }
117  }
118  return result;
119 }
120 
121 bool TextFileBufferParser::isCoordinateScalar(const std::string_view datum) {
122  // field looks like a scalar numeric value (and not a hex blob)
123  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
124  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
125 }
126 
127 namespace {
128 
129 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
130  const std::string_view lat_str,
131  SQLTypeInfo& ti,
132  std::vector<double>& coords,
133  const bool is_lon_lat_order) {
134  double lon = std::atof(std::string(lon_str).c_str());
135  double lat = NAN;
136 
138  lat = std::atof(std::string(lat_str).c_str());
139  }
140 
141  // Swap coordinates if this table uses a reverse order: lat/lon
142  if (!is_lon_lat_order) {
143  std::swap(lat, lon);
144  }
145 
146  // TODO: should check if POINT column should have been declared with
147  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
148  // throw std::runtime_error("POINT column " + cd->columnName + " is
149  // not WGS84, cannot insert lon/lat");
150  // }
151 
152  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
153  return false;
154  }
155 
156  if (ti.transforms()) {
157  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
158  if (!pt.transform(ti)) {
159  return false;
160  }
161  pt.getColumns(coords);
162  return true;
163  }
164 
165  coords.push_back(lon);
166  coords.push_back(lat);
167  return true;
168 }
169 } // namespace
170 
172  const std::list<const ColumnDescriptor*>& columns,
173  std::list<const ColumnDescriptor*>::iterator& cd_it,
174  const size_t starting_col_idx,
175  ParseBufferRequest& request) {
176  size_t col_idx = starting_col_idx;
177 
178  for (; cd_it != columns.end(); cd_it++) {
179  auto cd = *cd_it;
180  const auto& col_ti = cd->columnType;
181  if (col_ti.is_geometry()) {
182  if (request.import_buffers[col_idx] != nullptr) {
184  col_idx,
185  request.copy_params,
186  cd,
187  request.getCatalog());
188  } else {
189  ++col_idx;
190  col_idx += col_ti.get_physical_cols();
191  }
192  // skip remaining physical columns
193  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
194  ++cd_it;
195  }
196  } else {
197  if (request.import_buffers[col_idx] != nullptr) {
198  request.import_buffers[col_idx]->add_value(cd,
199  {},
200  true,
201  request.copy_params,
202  /*check_not_null=*/false);
203  }
204  ++col_idx;
205  }
206  }
207 }
208 
210  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
211  size_t& col_idx,
212  const import_export::CopyParams& copy_params,
213  const ColumnDescriptor* cd,
214  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
215  auto col_ti = cd->columnType;
216  SQLTypes col_type = col_ti.get_type();
217  CHECK(IS_GEO(col_type));
218 
219  // store null string in the base column
220  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
221  ++col_idx;
222 
223  std::vector<double> coords;
224  std::vector<double> bounds;
225  std::vector<int> ring_sizes;
226  std::vector<int> poly_rings;
227 
228  SQLTypeInfo import_ti{col_ti};
230  import_ti, coords, bounds, ring_sizes, poly_rings);
231 
232  // import extracted geo
234  cd,
235  import_buffers,
236  col_idx,
237  coords,
238  bounds,
239  ring_sizes,
240  poly_rings,
241  /*force_null=*/true);
242 }
243 
245  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
246  size_t& col_idx,
247  const import_export::CopyParams& copy_params,
248  std::list<const ColumnDescriptor*>::iterator& cd_it,
249  std::vector<std::string_view>& row,
250  size_t& import_idx,
251  bool is_null,
252  size_t first_row_index,
253  size_t row_index_plus_one,
254  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
255  auto cd = *cd_it;
256  auto col_ti = cd->columnType;
257  SQLTypes col_type = col_ti.get_type();
258  CHECK(IS_GEO(col_type));
259 
260  auto starting_col_idx = col_idx;
261 
262  auto const& geo_string = row[import_idx];
263  ++import_idx;
264  ++col_idx;
265 
266  std::vector<double> coords;
267  std::vector<double> bounds;
268  std::vector<int> ring_sizes;
269  std::vector<int> poly_rings;
270 
271  // prepare to transform from another SRID
272  SQLTypeInfo import_ti{col_ti};
273  if (import_ti.get_output_srid() == 4326) {
274  auto srid0 = copy_params.source_srid;
275  if (srid0 > 0) {
276  // srid0 -> 4326 transform is requested on import
277  import_ti.set_input_srid(srid0);
278  }
279  }
280 
281  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
283  geo_string, row[import_idx], import_ti, coords, copy_params.lonlat)) {
284  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
285  cd->columnName);
286  }
287  ++import_idx;
288  } else {
289  if (is_null || geo_string.empty() || geo_string == "NULL") {
291  import_ti, coords, bounds, ring_sizes, poly_rings);
292  is_null = true;
293  } else {
294  // extract geometry directly from WKT
296  std::string(geo_string),
297  import_ti,
298  coords,
299  bounds,
300  ring_sizes,
301  poly_rings,
302  copy_params.geo_validate_geometry)) {
303  std::string msg = "Failed to extract valid geometry from row " +
304  std::to_string(first_row_index + row_index_plus_one) +
305  " for column " + cd->columnName;
306  throw std::runtime_error(msg);
307  }
308 
309  // validate types
310  if (!geo_promoted_type_match(import_ti.get_type(), col_type)) {
311  throw std::runtime_error("Imported geometry doesn't match the type of column " +
312  cd->columnName);
313  }
314  }
315  }
316 
317  // allowed to be null?
318  if (is_null && col_ti.get_notnull()) {
319  throw std::runtime_error("NULL value provided for column (" + cd->columnName +
320  ") with NOT NULL constraint.");
321  }
322 
323  // import extracted geo
325  *catalog, cd, import_buffers, col_idx, coords, bounds, ring_sizes, poly_rings);
326 
327  // store null string in the base column
328  import_buffers[starting_col_idx]->add_value(
329  cd, copy_params.null_str, true, copy_params);
330 }
331 
332 bool TextFileBufferParser::isNullDatum(const std::string_view datum,
333  const ColumnDescriptor* column,
334  const std::string& null_indicator) {
335  bool is_null = ImportHelpers::is_null_datum(datum, null_indicator);
336 
337  // Treating empty as NULL
338  if (!column->columnType.is_string() && datum.empty()) {
339  is_null = true;
340  }
341 
342  if (is_null && column->columnType.get_notnull()) {
343  throw std::runtime_error("NULL value provided for column (" + column->columnName +
344  ") with NOT NULL constraint.");
345  }
346  return is_null;
347 }
348 } // namespace foreign_storage
bool geo_promoted_type_match(const SQLTypes a, const SQLTypes b)
Definition: sqltypes.h:2029
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
ParseBufferRequest(const ParseBufferRequest &request)=delete
SQLTypes
Definition: sqltypes.h:65
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings)
Definition: Types.cpp:1342
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void getColumns(std::vector< double > &coords) const
Definition: Types.cpp:568
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
Definition: Importer.cpp:1636
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool validate_with_geos_if_available)
Definition: Types.cpp:1121
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
#define IS_STRING(T)
Definition: sqltypes.h:309
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
#define CHECK(condition)
Definition: Logger.h:291
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
SQLTypeInfo columnType
static bool isCoordinateScalar(const std::string_view datum)
bool is_string() const
Definition: sqltypes.h:559
bool transforms() const
Definition: sqltypes.h:624
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
int8_t * numbersPtr
Definition: sqltypes.h:233
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:310