OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TextFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Geospatial/Types.h"
20 
21 namespace foreign_storage {
23  size_t buffer_size,
24  const import_export::CopyParams& copy_params,
25  int db_id,
26  const ForeignTable* foreign_table,
27  std::set<int> column_filter_set,
28  const std::string& full_path,
29  const RenderGroupAnalyzerMap* render_group_analyzer_map,
30  const bool track_rejected_rows)
31  : buffer_size(buffer_size)
32  , buffer_alloc_size(buffer_size)
33  , copy_params(copy_params)
34  , db_id(db_id)
35  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
36  , render_group_analyzer_map(render_group_analyzer_map)
37  , full_path(full_path)
38  , track_rejected_rows(track_rejected_rows) {
39  if (buffer_size > 0) {
40  buffer = std::make_unique<char[]>(buffer_size);
41  }
42  // initialize import buffers from columns.
43  for (const auto column : getColumns()) {
44  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
45  import_buffers.emplace_back(nullptr);
46  } else {
47  StringDictionary* string_dictionary = nullptr;
48  if (column->columnType.is_dict_encoded_string() ||
49  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
50  column->columnType.get_compression() == kENCODING_DICT)) {
51  auto dict_descriptor =
52  getCatalog()->getMetadataForDict(column->columnType.get_comp_param(), true);
53  string_dictionary = dict_descriptor->stringDict.get();
54  }
55  import_buffers.emplace_back(
56  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
57  }
58  }
59 }
60 
62  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
63  import_buffers) {
64  std::map<int, DataBlockPtr> result;
65  std::vector<std::pair<const size_t, std::future<int8_t*>>>
66  encoded_data_block_ptrs_futures;
67  // make all async calls to string dictionary here and then continue execution
68  for (const auto& import_buffer : import_buffers) {
69  if (import_buffer == nullptr) {
70  continue;
71  }
72  DataBlockPtr p;
73  if (import_buffer->getTypeInfo().is_number() ||
74  import_buffer->getTypeInfo().is_time() ||
75  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
76  p.numbersPtr = import_buffer->getAsBytes();
77  } else if (import_buffer->getTypeInfo().is_string()) {
78  auto string_payload_ptr = import_buffer->getStringBuffer();
79  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
80  p.stringsPtr = string_payload_ptr;
81  } else {
82  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
83  p.numbersPtr = nullptr;
84 
85  auto column_id = import_buffer->getColumnDesc()->columnId;
86  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
87  column_id,
88  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
89  import_buffer->addDictEncodedString(*string_payload_ptr);
90  return import_buffer->getStringDictBuffer();
91  })));
92  }
93  } else if (import_buffer->getTypeInfo().is_geometry()) {
94  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
95  p.stringsPtr = geo_payload_ptr;
96  } else {
97  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
98  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
99  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
100  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
102  } else {
103  p.arraysPtr = import_buffer->getArrayBuffer();
104  }
105  }
106  result[import_buffer->getColumnDesc()->columnId] = p;
107  }
108 
109  // wait for the async requests we made for string dictionary
110  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
111  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
112  }
113  return result;
114 }
115 
116 bool TextFileBufferParser::isCoordinateScalar(const std::string_view datum) {
117  // field looks like a scalar numeric value (and not a hex blob)
118  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
119  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
120 }
121 
122 namespace {
123 constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
124 
125 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
126  const std::string_view lat_str,
127  SQLTypeInfo& ti,
128  std::vector<double>& coords,
129  const bool is_lon_lat_order) {
130  double lon = std::atof(std::string(lon_str).c_str());
131  double lat = NAN;
132 
134  lat = std::atof(std::string(lat_str).c_str());
135  }
136 
137  // Swap coordinates if this table uses a reverse order: lat/lon
138  if (!is_lon_lat_order) {
139  std::swap(lat, lon);
140  }
141 
142  // TODO: should check if POINT column should have been declared with
143  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
144  // throw std::runtime_error("POINT column " + cd->columnName + " is
145  // not WGS84, cannot insert lon/lat");
146  // }
147 
148  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
149  return false;
150  }
151 
152  if (ti.transforms()) {
153  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
154  if (!pt.transform(ti)) {
155  return false;
156  }
157  pt.getColumns(coords);
158  return true;
159  }
160 
161  coords.push_back(lon);
162  coords.push_back(lat);
163  return true;
164 }
165 } // namespace
166 
168  const std::list<const ColumnDescriptor*>& columns,
169  std::list<const ColumnDescriptor*>::iterator& cd_it,
170  const size_t starting_col_idx,
171  ParseBufferRequest& request) {
172  size_t col_idx = starting_col_idx;
173 
174  for (; cd_it != columns.end(); cd_it++) {
175  auto cd = *cd_it;
176  const auto& col_ti = cd->columnType;
177  if (col_ti.is_geometry()) {
178  if (request.import_buffers[col_idx] != nullptr) {
180  col_idx,
181  request.copy_params,
182  cd,
183  request.getCatalog());
184  } else {
185  ++col_idx;
186  col_idx += col_ti.get_physical_cols();
187  }
188  // skip remaining physical columns
189  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
190  ++cd_it;
191  }
192  } else {
193  if (request.import_buffers[col_idx] != nullptr) {
194  request.import_buffers[col_idx]->add_value(cd,
195  {},
196  true,
197  request.copy_params,
198  /*check_not_null=*/false);
199  }
200  ++col_idx;
201  }
202  }
203 }
204 
206  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
207  size_t& col_idx,
208  const import_export::CopyParams& copy_params,
209  const ColumnDescriptor* cd,
210  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
211  auto col_ti = cd->columnType;
212  SQLTypes col_type = col_ti.get_type();
213  CHECK(IS_GEO(col_type));
214 
215  // store null string in the base column
216  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
217  ++col_idx;
218 
219  std::vector<double> coords;
220  std::vector<double> bounds;
221  std::vector<int> ring_sizes;
222  std::vector<int> poly_rings;
223  int render_group = 0;
224 
225  SQLTypeInfo import_ti{col_ti};
227  import_ti, coords, bounds, ring_sizes, poly_rings, PROMOTE_POLYGON_TO_MULTIPOLYGON);
228 
229  // import extracted geo
231  cd,
232  import_buffers,
233  col_idx,
234  coords,
235  bounds,
236  ring_sizes,
237  poly_rings,
238  render_group,
239  /*force_null=*/true);
240 }
241 
243  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
244  size_t& col_idx,
245  const import_export::CopyParams& copy_params,
246  std::list<const ColumnDescriptor*>::iterator& cd_it,
247  std::vector<std::string_view>& row,
248  size_t& import_idx,
249  bool is_null,
250  size_t first_row_index,
251  size_t row_index_plus_one,
252  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
253  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
254  auto cd = *cd_it;
255  auto col_ti = cd->columnType;
256  SQLTypes col_type = col_ti.get_type();
257  CHECK(IS_GEO(col_type));
258 
259  auto starting_col_idx = col_idx;
260 
261  auto const& geo_string = row[import_idx];
262  ++import_idx;
263  ++col_idx;
264 
265  std::vector<double> coords;
266  std::vector<double> bounds;
267  std::vector<int> ring_sizes;
268  std::vector<int> poly_rings;
269  int render_group = 0;
270 
271  // prepare to transform from another SRID
272  SQLTypeInfo import_ti{col_ti};
273  if (import_ti.get_output_srid() == 4326) {
274  auto srid0 = copy_params.source_srid;
275  if (srid0 > 0) {
276  // srid0 -> 4326 transform is requested on import
277  import_ti.set_input_srid(srid0);
278  }
279  }
280 
281  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
283  geo_string, row[import_idx], import_ti, coords, copy_params.lonlat)) {
284  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
285  cd->columnName);
286  }
287  ++import_idx;
288  } else {
289  if (is_null || geo_string.empty() || geo_string == "NULL") {
291  coords,
292  bounds,
293  ring_sizes,
294  poly_rings,
296  is_null = true;
297  } else {
298  // extract geometry directly from WKT
299  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
300  import_ti,
301  coords,
302  bounds,
303  ring_sizes,
304  poly_rings,
306  std::string msg = "Failed to extract valid geometry from row " +
307  std::to_string(first_row_index + row_index_plus_one) +
308  " for column " + cd->columnName;
309  throw std::runtime_error(msg);
310  }
311 
312  // validate types
313  if (col_type != import_ti.get_type()) {
315  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
316  col_type == SQLTypes::kMULTIPOLYGON)) {
317  throw std::runtime_error("Imported geometry doesn't match the type of column " +
318  cd->columnName);
319  }
320  }
321 
322  // get render group
323  if (IS_GEO_POLY(col_type) && render_group_analyzer_map &&
324  render_group_analyzer_map->size()) {
325  auto const itr = render_group_analyzer_map->find(cd->columnId);
326  if (itr != render_group_analyzer_map->end()) {
327  auto& render_group_analyzer = *itr->second;
328  render_group = render_group_analyzer.insertBoundsAndReturnRenderGroup(bounds);
329  }
330  }
331  }
332  }
333 
334  // allowed to be null?
335  if (is_null && col_ti.get_notnull()) {
336  throw std::runtime_error("NULL value provided for column (" + cd->columnName +
337  ") with NOT NULL constraint.");
338  }
339 
340  // import extracted geo
342  cd,
343  import_buffers,
344  col_idx,
345  coords,
346  bounds,
347  ring_sizes,
348  poly_rings,
349  render_group);
350 
351  // store null string in the base column
352  import_buffers[starting_col_idx]->add_value(
353  cd, copy_params.null_str, true, copy_params);
354 }
355 
356 bool TextFileBufferParser::isNullDatum(const std::string_view datum,
357  const ColumnDescriptor* column,
358  const std::string& null_indicator) {
359  bool is_null = (datum == null_indicator);
360 
361  // Treating empty as NULL
362  if (!column->columnType.is_string() && datum.empty()) {
363  is_null = true;
364  }
365 
366  if (is_null && column->columnType.get_notnull()) {
367  throw std::runtime_error("NULL value provided for column (" + column->columnName +
368  ") with NOT NULL constraint.");
369  }
370  return is_null;
371 }
372 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
void getColumns(std::vector< double > &coords) const
Definition: Types.cpp:567
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
ParseBufferRequest(const ParseBufferRequest &request)=delete
SQLTypes
Definition: sqltypes.h:38
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1144
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:937
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
#define IS_STRING(T)
Definition: sqltypes.h:250
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
#define CHECK(condition)
Definition: Logger.h:223
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
Definition: Importer.cpp:1653
SQLTypeInfo columnType
static bool isCoordinateScalar(const std::string_view datum)
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
bool is_string() const
Definition: sqltypes.h:510
bool transforms() const
Definition: sqltypes.h:531
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int8_t * numbersPtr
Definition: sqltypes.h:226
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:251
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255