OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TextFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Geospatial/Types.h"
19 #include "Shared/import_helpers.h"
20 
21 namespace foreign_storage {
22 
24  size_t buffer_size,
25  const import_export::CopyParams& copy_params,
26  int db_id,
27  const ForeignTable* foreign_table,
28  std::set<int> column_filter_set,
29  const std::string& full_path,
30  const RenderGroupAnalyzerMap* render_group_analyzer_map,
31  const bool track_rejected_rows)
32  : buffer_size(buffer_size)
33  , buffer_alloc_size(buffer_size)
34  , copy_params(copy_params)
35  , db_id(db_id)
36  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
37  , render_group_analyzer_map(render_group_analyzer_map)
38  , full_path(full_path)
39  , track_rejected_rows(track_rejected_rows) {
40  if (buffer_size > 0) {
41  buffer = std::make_unique<char[]>(buffer_size);
42  }
43  // initialize import buffers from columns.
44  for (const auto column : getColumns()) {
45  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
46  import_buffers.emplace_back(nullptr);
47  } else {
48  StringDictionary* string_dictionary = nullptr;
49  if (column->columnType.is_dict_encoded_string() ||
50  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
51  column->columnType.get_compression() == kENCODING_DICT)) {
52  auto dict_descriptor =
53  getCatalog()->getMetadataForDict(column->columnType.get_comp_param(), true);
54  string_dictionary = dict_descriptor->stringDict.get();
55  }
56  import_buffers.emplace_back(
57  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
58  }
59  }
60 }
61 
63  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
64  const bool skip_dict_encoding) {
65  std::map<int, DataBlockPtr> result;
66  std::vector<std::pair<const size_t, std::future<int8_t*>>>
67  encoded_data_block_ptrs_futures;
68  // make all async calls to string dictionary here and then continue execution
69  for (const auto& import_buffer : import_buffers) {
70  if (import_buffer == nullptr) {
71  continue;
72  }
73  DataBlockPtr p;
74  if (import_buffer->getTypeInfo().is_number() ||
75  import_buffer->getTypeInfo().is_time() ||
76  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
77  p.numbersPtr = import_buffer->getAsBytes();
78  } else if (import_buffer->getTypeInfo().is_string()) {
79  auto string_payload_ptr = import_buffer->getStringBuffer();
80  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
81  p.stringsPtr = string_payload_ptr;
82  } else {
83  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
84  p.numbersPtr = nullptr;
85 
86  if (!skip_dict_encoding) {
87  auto column_id = import_buffer->getColumnDesc()->columnId;
88  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
89  column_id,
90  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
91  import_buffer->addDictEncodedString(*string_payload_ptr);
92  return import_buffer->getStringDictBuffer();
93  })));
94  }
95  }
96  } else if (import_buffer->getTypeInfo().is_geometry()) {
97  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
98  p.stringsPtr = geo_payload_ptr;
99  } else {
100  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
101  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
102  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
103  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
104  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
105  } else {
106  p.arraysPtr = import_buffer->getArrayBuffer();
107  }
108  }
109  result[import_buffer->getColumnDesc()->columnId] = p;
110  }
111 
112  if (!skip_dict_encoding) {
113  // wait for the async requests we made for string dictionary
114  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
115  encoded_ptr_future.second.wait();
116  }
117  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
118  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
119  }
120  }
121  return result;
122 }
123 
124 bool TextFileBufferParser::isCoordinateScalar(const std::string_view datum) {
125  // field looks like a scalar numeric value (and not a hex blob)
126  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
127  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
128 }
129 
130 namespace {
131 constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
132 
133 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
134  const std::string_view lat_str,
135  SQLTypeInfo& ti,
136  std::vector<double>& coords,
137  const bool is_lon_lat_order) {
138  double lon = std::atof(std::string(lon_str).c_str());
139  double lat = NAN;
140 
142  lat = std::atof(std::string(lat_str).c_str());
143  }
144 
145  // Swap coordinates if this table uses a reverse order: lat/lon
146  if (!is_lon_lat_order) {
147  std::swap(lat, lon);
148  }
149 
150  // TODO: should check if POINT column should have been declared with
151  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
152  // throw std::runtime_error("POINT column " + cd->columnName + " is
153  // not WGS84, cannot insert lon/lat");
154  // }
155 
156  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
157  return false;
158  }
159 
160  if (ti.transforms()) {
161  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
162  if (!pt.transform(ti)) {
163  return false;
164  }
165  pt.getColumns(coords);
166  return true;
167  }
168 
169  coords.push_back(lon);
170  coords.push_back(lat);
171  return true;
172 }
173 } // namespace
174 
176  const std::list<const ColumnDescriptor*>& columns,
177  std::list<const ColumnDescriptor*>::iterator& cd_it,
178  const size_t starting_col_idx,
179  ParseBufferRequest& request) {
180  size_t col_idx = starting_col_idx;
181 
182  for (; cd_it != columns.end(); cd_it++) {
183  auto cd = *cd_it;
184  const auto& col_ti = cd->columnType;
185  if (col_ti.is_geometry()) {
186  if (request.import_buffers[col_idx] != nullptr) {
188  col_idx,
189  request.copy_params,
190  cd,
191  request.getCatalog());
192  } else {
193  ++col_idx;
194  col_idx += col_ti.get_physical_cols();
195  }
196  // skip remaining physical columns
197  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
198  ++cd_it;
199  }
200  } else {
201  if (request.import_buffers[col_idx] != nullptr) {
202  request.import_buffers[col_idx]->add_value(cd,
203  {},
204  true,
205  request.copy_params,
206  /*check_not_null=*/false);
207  }
208  ++col_idx;
209  }
210  }
211 }
212 
214  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
215  size_t& col_idx,
216  const import_export::CopyParams& copy_params,
217  const ColumnDescriptor* cd,
218  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
219  auto col_ti = cd->columnType;
220  SQLTypes col_type = col_ti.get_type();
221  CHECK(IS_GEO(col_type));
222 
223  // store null string in the base column
224  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
225  ++col_idx;
226 
227  std::vector<double> coords;
228  std::vector<double> bounds;
229  std::vector<int> ring_sizes;
230  std::vector<int> poly_rings;
231  int render_group = 0;
232 
233  SQLTypeInfo import_ti{col_ti};
235  import_ti, coords, bounds, ring_sizes, poly_rings, PROMOTE_POLYGON_TO_MULTIPOLYGON);
236 
237  // import extracted geo
239  cd,
240  import_buffers,
241  col_idx,
242  coords,
243  bounds,
244  ring_sizes,
245  poly_rings,
246  render_group,
247  /*force_null=*/true);
248 }
249 
251  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
252  size_t& col_idx,
253  const import_export::CopyParams& copy_params,
254  std::list<const ColumnDescriptor*>::iterator& cd_it,
255  std::vector<std::string_view>& row,
256  size_t& import_idx,
257  bool is_null,
258  size_t first_row_index,
259  size_t row_index_plus_one,
260  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
261  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
262  auto cd = *cd_it;
263  auto col_ti = cd->columnType;
264  SQLTypes col_type = col_ti.get_type();
265  CHECK(IS_GEO(col_type));
266 
267  auto starting_col_idx = col_idx;
268 
269  auto const& geo_string = row[import_idx];
270  ++import_idx;
271  ++col_idx;
272 
273  std::vector<double> coords;
274  std::vector<double> bounds;
275  std::vector<int> ring_sizes;
276  std::vector<int> poly_rings;
277  int render_group = 0;
278 
279  // prepare to transform from another SRID
280  SQLTypeInfo import_ti{col_ti};
281  if (import_ti.get_output_srid() == 4326) {
282  auto srid0 = copy_params.source_srid;
283  if (srid0 > 0) {
284  // srid0 -> 4326 transform is requested on import
285  import_ti.set_input_srid(srid0);
286  }
287  }
288 
289  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
291  geo_string, row[import_idx], import_ti, coords, copy_params.lonlat)) {
292  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
293  cd->columnName);
294  }
295  ++import_idx;
296  } else {
297  if (is_null || geo_string.empty() || geo_string == "NULL") {
299  coords,
300  bounds,
301  ring_sizes,
302  poly_rings,
304  is_null = true;
305  } else {
306  // extract geometry directly from WKT
307  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
308  import_ti,
309  coords,
310  bounds,
311  ring_sizes,
312  poly_rings,
314  std::string msg = "Failed to extract valid geometry from row " +
315  std::to_string(first_row_index + row_index_plus_one) +
316  " for column " + cd->columnName;
317  throw std::runtime_error(msg);
318  }
319 
320  // validate types
321  if (col_type != import_ti.get_type()) {
323  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
324  col_type == SQLTypes::kMULTIPOLYGON)) {
325  throw std::runtime_error("Imported geometry doesn't match the type of column " +
326  cd->columnName);
327  }
328  }
329 
330  // get render group
331  if (IS_GEO_POLY(col_type) && render_group_analyzer_map &&
332  render_group_analyzer_map->size()) {
333  auto const itr = render_group_analyzer_map->find(cd->columnId);
334  if (itr != render_group_analyzer_map->end()) {
335  auto& render_group_analyzer = *itr->second;
336  render_group = render_group_analyzer.insertBoundsAndReturnRenderGroup(bounds);
337  }
338  }
339  }
340  }
341 
342  // allowed to be null?
343  if (is_null && col_ti.get_notnull()) {
344  throw std::runtime_error("NULL value provided for column (" + cd->columnName +
345  ") with NOT NULL constraint.");
346  }
347 
348  // import extracted geo
350  cd,
351  import_buffers,
352  col_idx,
353  coords,
354  bounds,
355  ring_sizes,
356  poly_rings,
357  render_group);
358 
359  // store null string in the base column
360  import_buffers[starting_col_idx]->add_value(
361  cd, copy_params.null_str, true, copy_params);
362 }
363 
364 bool TextFileBufferParser::isNullDatum(const std::string_view datum,
365  const ColumnDescriptor* column,
366  const std::string& null_indicator) {
367  bool is_null = ImportHelpers::is_null_datum(datum, null_indicator);
368 
369  // Treating empty as NULL
370  if (!column->columnType.is_string() && datum.empty()) {
371  is_null = true;
372  }
373 
374  if (is_null && column->columnType.get_notnull()) {
375  throw std::runtime_error("NULL value provided for column (" + column->columnName +
376  ") with NOT NULL constraint.");
377  }
378  return is_null;
379 }
380 } // namespace foreign_storage
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
void getColumns(std::vector< double > &coords) const
Definition: Types.cpp:567
ParseBufferRequest(const ParseBufferRequest &request)=delete
SQLTypes
Definition: sqltypes.h:55
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:224
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:225
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1309
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1079
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
#define IS_STRING(T)
Definition: sqltypes.h:299
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
#define CHECK(condition)
Definition: Logger.h:291
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
Definition: Importer.cpp:1627
SQLTypeInfo columnType
static bool isCoordinateScalar(const std::string_view datum)
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
bool is_string() const
Definition: sqltypes.h:580
bool transforms() const
Definition: sqltypes.h:615
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:388
int8_t * numbersPtr
Definition: sqltypes.h:223
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:300
#define IS_GEO_POLY(T)
Definition: sqltypes.h:305