OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvFileBufferParser.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include "Geospatial/Types.h"
5 #include "Shared/misc.h"
6 
7 namespace foreign_storage {
8 namespace csv_file_buffer_parser {
9 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
10 
12  std::unique_ptr<bool[]>& array_flags,
13  int& phys_cols,
14  int& point_cols,
15  const std::list<const ColumnDescriptor*>& columns) {
16  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
17  size_t i = 0;
18  for (const auto cd : columns) {
19  const auto& col_ti = cd->columnType;
20  phys_cols += col_ti.get_physical_cols();
21  if (cd->columnType.get_type() == kPOINT) {
22  point_cols++;
23  }
24 
25  if (cd->columnType.get_type() == kARRAY) {
26  array_flags.get()[i] = true;
27  } else {
28  array_flags.get()[i] = false;
29  }
30  i++;
31  }
32 }
33 
34 void validate_expected_column_count(std::vector<std::string_view>& row,
35  size_t num_cols,
36  int point_cols) {
37  // Each POINT could consume two separate coords instead of a single WKT
38  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
39  std::stringstream string_stream;
40  string_stream << "Mismatched number of logical columns: (expected " << num_cols
41  << " columns, has " << row.size()
42  << "): " << shared::printContainer(row);
43  LOG(ERROR) << string_stream.str();
44  throw std::runtime_error{string_stream.str()};
45  }
46 }
47 
48 bool is_coordinate_scalar(const std::string_view datum) {
49  // field looks like a scalar numeric value (and not a hex blob)
50  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
51  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
52 }
53 
54 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
55  const std::string_view lat_str,
56  std::vector<double>& coords,
57  const bool is_lon_lat_order) {
58  double lon = std::atof(std::string(lon_str).c_str());
59  double lat = NAN;
60 
61  if (is_coordinate_scalar(lat_str)) {
62  lat = std::atof(std::string(lat_str).c_str());
63  }
64 
65  // Swap coordinates if this table uses a reverse order: lat/lon
66  if (!is_lon_lat_order) {
67  std::swap(lat, lon);
68  }
69 
70  // TODO: should check if POINT column should have been declared with
71  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
72  // throw std::runtime_error("POINT column " + cd->columnName + " is
73  // not WGS84, cannot insert lon/lat");
74  // }
75 
76  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
77  return false;
78  }
79  coords.push_back(lon);
80  coords.push_back(lat);
81  return true;
82 }
83 
84 bool is_null_datum(const std::string_view datum,
85  const ColumnDescriptor* column,
86  const std::string& null_indicator) {
87  bool is_null = (datum == null_indicator);
88 
89  // Treating empty as NULL
90  if (!column->columnType.is_string() && datum.empty()) {
91  is_null = true;
92  }
93 
94  if (is_null && column->columnType.get_notnull()) {
95  throw std::runtime_error("NULL value provided for column (" + column->columnName +
96  ") with NOT NULL constraint.");
97  }
98  return is_null;
99 }
100 
102  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
103  size_t& col_idx,
104  const import_export::CopyParams& copy_params,
105  std::list<const ColumnDescriptor*>::iterator& cd_it,
106  std::vector<std::string_view>& row,
107  size_t& import_idx,
108  bool is_null,
109  size_t first_row_index,
110  size_t row_index_plus_one,
111  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
112  auto cd = *cd_it;
113  auto col_ti = cd->columnType;
114  SQLTypes col_type = col_ti.get_type();
115  CHECK(IS_GEO(col_type));
116 
117  // store null string in the base column
118  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
119 
120  auto const& geo_string = row[import_idx];
121  ++import_idx;
122  ++col_idx;
123 
124  std::vector<double> coords;
125  std::vector<double> bounds;
126  std::vector<int> ring_sizes;
127  std::vector<int> poly_rings;
128  int render_group = 0;
129 
130  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
132  geo_string, row[import_idx], coords, copy_params.lonlat)) {
133  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
134  cd->columnName);
135  }
136  ++import_idx;
137  } else {
138  SQLTypeInfo import_ti{col_ti};
139  if (is_null) {
141  coords,
142  bounds,
143  ring_sizes,
144  poly_rings,
146  } else {
147  // extract geometry directly from WKT
148  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
149  import_ti,
150  coords,
151  bounds,
152  ring_sizes,
153  poly_rings,
155  std::string msg = "Failed to extract valid geometry from row " +
156  std::to_string(first_row_index + row_index_plus_one) +
157  " for column " + cd->columnName;
158  throw std::runtime_error(msg);
159  }
160 
161  // validate types
162  if (col_type != import_ti.get_type()) {
164  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
165  col_type == SQLTypes::kMULTIPOLYGON)) {
166  throw std::runtime_error("Imported geometry doesn't match the type of column " +
167  cd->columnName);
168  }
169  }
170  }
171  }
172 
173  // import extracted geo
175  cd,
176  import_buffers,
177  col_idx,
178  coords,
179  bounds,
180  ring_sizes,
181  poly_rings,
182  render_group);
183 }
184 
185 std::map<int, DataBlockPtr> convert_import_buffers_to_data_blocks(
186  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
187  import_buffers) {
188  std::map<int, DataBlockPtr> result;
189  std::vector<std::pair<const size_t, std::future<int8_t*>>>
190  encoded_data_block_ptrs_futures;
191  // make all async calls to string dictionary here and then continue execution
192  for (const auto& import_buffer : import_buffers) {
193  if (import_buffer == nullptr)
194  continue;
195  DataBlockPtr p;
196  if (import_buffer->getTypeInfo().is_number() ||
197  import_buffer->getTypeInfo().is_time() ||
198  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
199  p.numbersPtr = import_buffer->getAsBytes();
200  } else if (import_buffer->getTypeInfo().is_string()) {
201  auto string_payload_ptr = import_buffer->getStringBuffer();
202  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
203  p.stringsPtr = string_payload_ptr;
204  } else {
205  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
206  p.numbersPtr = nullptr;
207 
208  auto column_id = import_buffer->getColumnDesc()->columnId;
209  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
210  column_id,
211  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
212  import_buffer->addDictEncodedString(*string_payload_ptr);
213  return import_buffer->getStringDictBuffer();
214  })));
215  }
216  } else if (import_buffer->getTypeInfo().is_geometry()) {
217  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
218  p.stringsPtr = geo_payload_ptr;
219  } else {
220  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
221  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
222  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
223  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
224  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
225  } else {
226  p.arraysPtr = import_buffer->getArrayBuffer();
227  }
228  }
229  result[import_buffer->getColumnDesc()->columnId] = p;
230  }
231 
232  // wait for the async requests we made for string dictionary
233  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
234  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
235  }
236  return result;
237 }
238 
242  ParseBufferRequest(ParseBufferRequest&& request) = default;
243 
244  std::unique_ptr<char[]> buffer;
245  size_t buffer_size;
248  size_t begin_pos;
249  size_t end_pos;
251  size_t file_offset;
253  std::list<const ColumnDescriptor*> columns;
254  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
255  std::shared_ptr<Catalog_Namespace::Catalog> catalog;
256  int db_id;
257  int32_t table_id;
260 };
261 
263  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
264  size_t row_count;
265  std::vector<size_t> row_offsets;
266 };
267 
268 bool skip_column_import(ParseBufferRequest& request, int column_idx) {
269  return request.import_buffers[column_idx] == nullptr;
270 }
271 
277  CHECK(request.buffer);
279  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
280  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
281  const char* thread_buf_end = request.buffer.get() + request.end_pos;
282  const char* buf_end = request.buffer.get() + request.buffer_size;
283 
284  std::vector<std::string_view> row;
285  size_t row_index_plus_one = 0;
286  const char* p = thread_buf;
287  bool try_single_thread = false;
288  int phys_cols = 0;
289  int point_cols = 0;
290  std::unique_ptr<bool[]> array_flags;
291 
293  array_flags, phys_cols, point_cols, request.columns);
294  auto num_cols = request.columns.size() - phys_cols;
295 
296  size_t row_count = 0;
297  size_t remaining_row_count = request.process_row_count;
298  std::vector<size_t> row_offsets{};
299  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
300 
301  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
302  row.clear();
303  row_count++;
304  std::vector<std::unique_ptr<char[]>>
305  tmp_buffers; // holds string w/ removed escape chars, etc
306 
308  thread_buf_end,
309  buf_end,
310  request.copy_params,
311  array_flags.get(),
312  row,
313  tmp_buffers,
314  try_single_thread);
315 
316  row_index_plus_one++;
317  validate_expected_column_count(row, num_cols, point_cols);
318 
319  size_t import_idx = 0;
320  size_t col_idx = 0;
321  try {
322  for (auto cd_it = request.columns.begin(); cd_it != request.columns.end();
323  cd_it++) {
324  auto cd = *cd_it;
325  const auto& col_ti = cd->columnType;
326  bool is_null = is_null_datum(row[import_idx], cd, request.copy_params.null_str);
327 
328  if (col_ti.is_geometry()) {
329  if (!skip_column_import(request, col_idx)) {
331  col_idx,
332  request.copy_params,
333  cd_it,
334  row,
335  import_idx,
336  is_null,
337  request.first_row_index,
338  row_index_plus_one,
339  request.catalog);
340  } else {
341  // update import/col idx according to types
342  if (!is_null && cd->columnType == kPOINT &&
343  is_coordinate_scalar(row[import_idx])) {
344  ++import_idx;
345  }
346  ++import_idx;
347  ++col_idx;
348  col_idx += col_ti.get_physical_cols();
349  }
350  // skip remaining physical columns
351  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
352  ++cd_it;
353  }
354  } else {
355  if (!skip_column_import(request, col_idx)) {
356  request.import_buffers[col_idx]->add_value(
357  cd, row[import_idx], is_null, request.copy_params);
358  }
359  ++import_idx;
360  ++col_idx;
361  }
362  }
363  } catch (const std::exception& e) {
364  // TODO: Appropriate error handling for FSI
365  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
366  request.import_buffers[col_idx_to_pop]->pop_value();
367  }
368  LOG(ERROR) << "Input exception thrown: " << e.what()
369  << ". Row discarded. Data: " << shared::printContainer(row);
370  }
371  }
372  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
373 
375  result.row_offsets = row_offsets;
376  result.row_count = row_count;
377  result.column_id_to_data_blocks_map =
379  return result;
380 }
381 } // namespace csv_file_buffer_parser
382 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
SQLTypes
Definition: sqltypes.h:40
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
bool is_coordinate_scalar(const std::string_view datum)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
#define LOG(tag)
Definition: Logger.h:188
ParseBufferResult parse_buffer(ParseBufferRequest &request)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
#define UNREACHABLE()
Definition: Logger.h:241
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1071
std::shared_ptr< Catalog_Namespace::Catalog > catalog
std::string to_string(char const *&&v)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1421
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
specifies the content in-memory of a row in the column metadata table
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:864
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
bool is_null(const T &v, const SQLTypeInfo &t)
#define IS_STRING(T)
Definition: sqltypes.h:173
#define CHECK(condition)
Definition: Logger.h:197
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols)
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
bool is_string() const
Definition: sqltypes.h:417
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
int8_t * numbersPtr
Definition: sqltypes.h:149
std::string columnName
#define IS_GEO(T)
Definition: sqltypes.h:174
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.