OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TextFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Geospatial/Types.h"
20 
21 namespace foreign_storage {
23  const import_export::CopyParams& copy_params,
24  int db_id,
25  const ForeignTable* foreign_table,
26  std::set<int> column_filter_set,
27  const std::string& full_path)
28  : buffer_size(buffer_size)
29  , buffer_alloc_size(buffer_size)
30  , copy_params(copy_params)
31  , db_id(db_id)
32  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
33  , full_path(full_path) {
34  if (buffer_size > 0) {
35  buffer = std::make_unique<char[]>(buffer_size);
36  }
37  // initialize import buffers from columns.
38  for (const auto column : getColumns()) {
39  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
40  import_buffers.emplace_back(nullptr);
41  } else {
42  StringDictionary* string_dictionary = nullptr;
43  if (column->columnType.is_dict_encoded_string() ||
44  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
45  column->columnType.get_compression() == kENCODING_DICT)) {
46  auto dict_descriptor =
47  getCatalog()->getMetadataForDict(column->columnType.get_comp_param(), true);
48  string_dictionary = dict_descriptor->stringDict.get();
49  }
50  import_buffers.emplace_back(
51  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
52  }
53  }
54 }
55 
57  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
58  import_buffers) {
59  std::map<int, DataBlockPtr> result;
60  std::vector<std::pair<const size_t, std::future<int8_t*>>>
61  encoded_data_block_ptrs_futures;
62  // make all async calls to string dictionary here and then continue execution
63  for (const auto& import_buffer : import_buffers) {
64  if (import_buffer == nullptr) {
65  continue;
66  }
67  DataBlockPtr p;
68  if (import_buffer->getTypeInfo().is_number() ||
69  import_buffer->getTypeInfo().is_time() ||
70  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
71  p.numbersPtr = import_buffer->getAsBytes();
72  } else if (import_buffer->getTypeInfo().is_string()) {
73  auto string_payload_ptr = import_buffer->getStringBuffer();
74  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
75  p.stringsPtr = string_payload_ptr;
76  } else {
77  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
78  p.numbersPtr = nullptr;
79 
80  auto column_id = import_buffer->getColumnDesc()->columnId;
81  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
82  column_id,
83  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
84  import_buffer->addDictEncodedString(*string_payload_ptr);
85  return import_buffer->getStringDictBuffer();
86  })));
87  }
88  } else if (import_buffer->getTypeInfo().is_geometry()) {
89  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
90  p.stringsPtr = geo_payload_ptr;
91  } else {
92  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
93  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
94  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
95  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
96  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
97  } else {
98  p.arraysPtr = import_buffer->getArrayBuffer();
99  }
100  }
101  result[import_buffer->getColumnDesc()->columnId] = p;
102  }
103 
104  // wait for the async requests we made for string dictionary
105  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
106  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
107  }
108  return result;
109 }
110 
111 bool TextFileBufferParser::isCoordinateScalar(const std::string_view datum) {
112  // field looks like a scalar numeric value (and not a hex blob)
113  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
114  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
115 }
116 
117 namespace {
118 constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
119 
120 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
121  const std::string_view lat_str,
122  std::vector<double>& coords,
123  const bool is_lon_lat_order) {
124  double lon = std::atof(std::string(lon_str).c_str());
125  double lat = NAN;
126 
128  lat = std::atof(std::string(lat_str).c_str());
129  }
130 
131  // Swap coordinates if this table uses a reverse order: lat/lon
132  if (!is_lon_lat_order) {
133  std::swap(lat, lon);
134  }
135 
136  // TODO: should check if POINT column should have been declared with
137  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
138  // throw std::runtime_error("POINT column " + cd->columnName + " is
139  // not WGS84, cannot insert lon/lat");
140  // }
141 
142  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
143  return false;
144  }
145  coords.push_back(lon);
146  coords.push_back(lat);
147  return true;
148 }
149 } // namespace
150 
152  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
153  size_t& col_idx,
154  const import_export::CopyParams& copy_params,
155  std::list<const ColumnDescriptor*>::iterator& cd_it,
156  std::vector<std::string_view>& row,
157  size_t& import_idx,
158  bool is_null,
159  size_t first_row_index,
160  size_t row_index_plus_one,
161  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
162  auto cd = *cd_it;
163  auto col_ti = cd->columnType;
164  SQLTypes col_type = col_ti.get_type();
165  CHECK(IS_GEO(col_type));
166 
167  // store null string in the base column
168  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
169 
170  auto const& geo_string = row[import_idx];
171  ++import_idx;
172  ++col_idx;
173 
174  std::vector<double> coords;
175  std::vector<double> bounds;
176  std::vector<int> ring_sizes;
177  std::vector<int> poly_rings;
178  int render_group = 0;
179 
180  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
182  geo_string, row[import_idx], coords, copy_params.lonlat)) {
183  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
184  cd->columnName);
185  }
186  ++import_idx;
187  } else {
188  SQLTypeInfo import_ti{col_ti};
189  if (is_null) {
191  coords,
192  bounds,
193  ring_sizes,
194  poly_rings,
196  } else {
197  // extract geometry directly from WKT
198  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
199  import_ti,
200  coords,
201  bounds,
202  ring_sizes,
203  poly_rings,
205  std::string msg = "Failed to extract valid geometry from row " +
206  std::to_string(first_row_index + row_index_plus_one) +
207  " for column " + cd->columnName;
208  throw std::runtime_error(msg);
209  }
210 
211  // validate types
212  if (col_type != import_ti.get_type()) {
214  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
215  col_type == SQLTypes::kMULTIPOLYGON)) {
216  throw std::runtime_error("Imported geometry doesn't match the type of column " +
217  cd->columnName);
218  }
219  }
220  }
221  }
222 
223  // import extracted geo
225  cd,
226  import_buffers,
227  col_idx,
228  coords,
229  bounds,
230  ring_sizes,
231  poly_rings,
232  render_group);
233 }
234 
235 bool TextFileBufferParser::isNullDatum(const std::string_view datum,
236  const ColumnDescriptor* column,
237  const std::string& null_indicator) {
238  bool is_null = (datum == null_indicator);
239 
240  // Treating empty as NULL
241  if (!column->columnType.is_string() && datum.empty()) {
242  is_null = true;
243  }
244 
245  if (is_null && column->columnType.get_notnull()) {
246  throw std::runtime_error("NULL value provided for column (" + column->columnName +
247  ") with NOT NULL constraint.");
248  }
249  return is_null;
250 }
251 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
ParseBufferRequest(const ParseBufferRequest &request)=delete
SQLTypes
Definition: sqltypes.h:38
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1144
std::string to_string(char const *&&v)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
Definition: Importer.cpp:1630
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:937
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
#define IS_STRING(T)
Definition: sqltypes.h:250
#define CHECK(condition)
Definition: Logger.h:209
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
SQLTypeInfo columnType
static bool isCoordinateScalar(const std::string_view datum)
bool is_string() const
Definition: sqltypes.h:509
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int8_t * numbersPtr
Definition: sqltypes.h:226
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:251