OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 #include "Geospatial/Types.h"
21 #include "ImportExport/Importer.h"
22 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
27 constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
28 
29 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
30  return request.import_buffers[column_idx] == nullptr;
31 }
32 
34  std::unique_ptr<bool[]>& array_flags,
35  int& phys_cols,
36  int& point_cols,
37  const std::list<const ColumnDescriptor*>& columns) {
38  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
39  size_t i = 0;
40  for (const auto cd : columns) {
41  const auto& col_ti = cd->columnType;
42  phys_cols += col_ti.get_physical_cols();
43  if (cd->columnType.get_type() == kPOINT) {
44  point_cols++;
45  }
46 
47  if (cd->columnType.get_type() == kARRAY) {
48  array_flags.get()[i] = true;
49  } else {
50  array_flags.get()[i] = false;
51  }
52  i++;
53  }
54 }
55 
56 void validate_expected_column_count(std::vector<std::string_view>& row,
57  size_t num_cols,
58  int point_cols,
59  const std::string& file_name) {
60  // Each POINT could consume two separate coords instead of a single WKT
61  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
62  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
63  }
64 }
65 
66 bool is_coordinate_scalar(const std::string_view datum) {
67  // field looks like a scalar numeric value (and not a hex blob)
68  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
69  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
70 }
71 
72 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
73  const std::string_view lat_str,
74  std::vector<double>& coords,
75  const bool is_lon_lat_order) {
76  double lon = std::atof(std::string(lon_str).c_str());
77  double lat = NAN;
78 
79  if (is_coordinate_scalar(lat_str)) {
80  lat = std::atof(std::string(lat_str).c_str());
81  }
82 
83  // Swap coordinates if this table uses a reverse order: lat/lon
84  if (!is_lon_lat_order) {
85  std::swap(lat, lon);
86  }
87 
88  // TODO: should check if POINT column should have been declared with
89  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
90  // throw std::runtime_error("POINT column " + cd->columnName + " is
91  // not WGS84, cannot insert lon/lat");
92  // }
93 
94  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
95  return false;
96  }
97  coords.push_back(lon);
98  coords.push_back(lat);
99  return true;
100 }
101 
102 bool is_null_datum(const std::string_view datum,
103  const ColumnDescriptor* column,
104  const std::string& null_indicator) {
105  bool is_null = (datum == null_indicator);
106 
107  // Treating empty as NULL
108  if (!column->columnType.is_string() && datum.empty()) {
109  is_null = true;
110  }
111 
112  if (is_null && column->columnType.get_notnull()) {
113  throw std::runtime_error("NULL value provided for column (" + column->columnName +
114  ") with NOT NULL constraint.");
115  }
116  return is_null;
117 }
118 
120  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
121  size_t& col_idx,
122  const import_export::CopyParams& copy_params,
123  std::list<const ColumnDescriptor*>::iterator& cd_it,
124  std::vector<std::string_view>& row,
125  size_t& import_idx,
126  bool is_null,
127  size_t first_row_index,
128  size_t row_index_plus_one,
129  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
130  auto cd = *cd_it;
131  auto col_ti = cd->columnType;
132  SQLTypes col_type = col_ti.get_type();
133  CHECK(IS_GEO(col_type));
134 
135  // store null string in the base column
136  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
137 
138  auto const& geo_string = row[import_idx];
139  ++import_idx;
140  ++col_idx;
141 
142  std::vector<double> coords;
143  std::vector<double> bounds;
144  std::vector<int> ring_sizes;
145  std::vector<int> poly_rings;
146  int render_group = 0;
147 
148  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
150  geo_string, row[import_idx], coords, copy_params.lonlat)) {
151  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
152  cd->columnName);
153  }
154  ++import_idx;
155  } else {
156  SQLTypeInfo import_ti{col_ti};
157  if (is_null) {
159  coords,
160  bounds,
161  ring_sizes,
162  poly_rings,
163  PROMOTE_POLYGON_TO_MULTIPOLYGON);
164  } else {
165  // extract geometry directly from WKT
166  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
167  import_ti,
168  coords,
169  bounds,
170  ring_sizes,
171  poly_rings,
172  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
173  std::string msg = "Failed to extract valid geometry from row " +
174  std::to_string(first_row_index + row_index_plus_one) +
175  " for column " + cd->columnName;
176  throw std::runtime_error(msg);
177  }
178 
179  // validate types
180  if (col_type != import_ti.get_type()) {
181  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
182  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
183  col_type == SQLTypes::kMULTIPOLYGON)) {
184  throw std::runtime_error("Imported geometry doesn't match the type of column " +
185  cd->columnName);
186  }
187  }
188  }
189  }
190 
191  // import extracted geo
193  cd,
194  import_buffers,
195  col_idx,
196  coords,
197  bounds,
198  ring_sizes,
199  poly_rings,
200  render_group);
201 }
202 
203 std::map<int, DataBlockPtr> convert_import_buffers_to_data_blocks(
204  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
205  import_buffers) {
206  std::map<int, DataBlockPtr> result;
207  std::vector<std::pair<const size_t, std::future<int8_t*>>>
208  encoded_data_block_ptrs_futures;
209  // make all async calls to string dictionary here and then continue execution
210  for (const auto& import_buffer : import_buffers) {
211  if (import_buffer == nullptr)
212  continue;
213  DataBlockPtr p;
214  if (import_buffer->getTypeInfo().is_number() ||
215  import_buffer->getTypeInfo().is_time() ||
216  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
217  p.numbersPtr = import_buffer->getAsBytes();
218  } else if (import_buffer->getTypeInfo().is_string()) {
219  auto string_payload_ptr = import_buffer->getStringBuffer();
220  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
221  p.stringsPtr = string_payload_ptr;
222  } else {
223  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
224  p.numbersPtr = nullptr;
225 
226  auto column_id = import_buffer->getColumnDesc()->columnId;
227  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
228  column_id,
229  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
230  import_buffer->addDictEncodedString(*string_payload_ptr);
231  return import_buffer->getStringDictBuffer();
232  })));
233  }
234  } else if (import_buffer->getTypeInfo().is_geometry()) {
235  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
236  p.stringsPtr = geo_payload_ptr;
237  } else {
238  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
239  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
240  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
241  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
242  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
243  } else {
244  p.arraysPtr = import_buffer->getArrayBuffer();
245  }
246  }
247  result[import_buffer->getColumnDesc()->columnId] = p;
248  }
249 
250  // wait for the async requests we made for string dictionary
251  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
252  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
253  }
254  return result;
255 }
256 
257 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
258  const std::string& option_name) {
259  if (auto it = foreign_table->options.find(option_name);
260  it != foreign_table->options.end()) {
261  if (it->second.length() == 1) {
262  return it->second;
263  } else {
264  if (it->second == std::string("\\n")) {
265  return "\n";
266  } else if (it->second == std::string("\\t")) {
267  return "\t";
268  } else {
269  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
270  "\". Expected a single character, \"\\n\" or \"\\t\"."};
271  }
272  }
273  }
274  return "";
275 }
276 
277 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
278  const std::string& option_name,
279  const size_t expected_num_chars) {
280  if (auto it = foreign_table->options.find(option_name);
281  it != foreign_table->options.end()) {
282  if (it->second.length() != expected_num_chars) {
283  throw std::runtime_error{"Value of \"" + option_name +
284  "\" foreign table option has the wrong number of "
285  "characters. Expected " +
286  std::to_string(expected_num_chars) + " character(s)."};
287  }
288  return it->second;
289  }
290  return "";
291 }
292 
293 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
294  const std::string& option_name) {
295  if (auto it = foreign_table->options.find(option_name);
296  it != foreign_table->options.end()) {
297  if (boost::iequals(it->second, "TRUE")) {
298  return true;
299  } else if (boost::iequals(it->second, "FALSE")) {
300  return false;
301  } else {
302  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
303  "\" foreign table option. "
304  "Value must be either 'true' or 'false'."};
305  }
306  }
307  return std::nullopt;
308 }
309 } // namespace
310 
316  bool convert_data_blocks,
317  bool columns_are_pre_filtered) const {
318  CHECK(request.buffer);
320  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
321  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
322  const char* thread_buf_end = request.buffer.get() + request.end_pos;
323  const char* buf_end = request.buffer.get() + request.buffer_size;
324 
325  std::vector<std::string_view> row;
326  size_t row_index_plus_one = 0;
327  const char* p = thread_buf;
328  bool try_single_thread = false;
329  int phys_cols = 0;
330  int point_cols = 0;
331  std::unique_ptr<bool[]> array_flags;
332 
334  array_flags, phys_cols, point_cols, request.getColumns());
335  auto num_cols = request.getColumns().size() - phys_cols;
336  if (columns_are_pre_filtered) {
337  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
338  if (skip_column_import(request, col_idx)) {
339  --num_cols;
340  }
341  }
342  }
343 
344  size_t row_count = 0;
345  size_t remaining_row_count = request.process_row_count;
346  std::vector<size_t> row_offsets{};
347  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
348 
349  std::string file_path = request.getFilePath();
350  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
351  row.clear();
352  row_count++;
353  std::vector<std::unique_ptr<char[]>>
354  tmp_buffers; // holds string w/ removed escape chars, etc
355  const char* line_start = p;
357  thread_buf_end,
358  buf_end,
359  request.copy_params,
360  array_flags.get(),
361  row,
362  tmp_buffers,
363  try_single_thread,
364  !columns_are_pre_filtered);
365 
366  row_index_plus_one++;
367  validate_expected_column_count(row, num_cols, point_cols, file_path);
368 
369  size_t import_idx = 0;
370  size_t col_idx = 0;
371  try {
372  auto columns = request.getColumns();
373  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
374  auto cd = *cd_it;
375  const auto& col_ti = cd->columnType;
376  bool column_is_present =
377  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
378  CHECK(row.size() > import_idx || !column_is_present);
379  bool is_null =
380  column_is_present
381  ? is_null_datum(row[import_idx], cd, request.copy_params.null_str)
382  : true;
383 
384  if (col_ti.is_geometry()) {
385  if (!skip_column_import(request, col_idx)) {
387  col_idx,
388  request.copy_params,
389  cd_it,
390  row,
391  import_idx,
392  is_null,
393  request.first_row_index,
394  row_index_plus_one,
395  request.getCatalog());
396  } else {
397  // update import/col idx according to types
398  if (!is_null && cd->columnType == kPOINT &&
399  is_coordinate_scalar(row[import_idx])) {
400  if (!columns_are_pre_filtered)
401  ++import_idx;
402  }
403  if (!columns_are_pre_filtered)
404  ++import_idx;
405  ++col_idx;
406  col_idx += col_ti.get_physical_cols();
407  }
408  // skip remaining physical columns
409  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
410  ++cd_it;
411  }
412  } else {
413  if (!skip_column_import(request, col_idx)) {
414  request.import_buffers[col_idx]->add_value(
415  cd, row[import_idx], is_null, request.copy_params);
416  }
417  if (column_is_present) {
418  ++import_idx;
419  }
420  ++col_idx;
421  }
422  }
423  } catch (const std::exception& e) {
424  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
425  "\" in row \"" + std::string(line_start, p) +
426  "\" in file \"" + file_path + "\"");
427  }
428  }
429  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
430 
432  result.row_offsets = row_offsets;
433  result.row_count = row_count;
434  if (convert_data_blocks) {
435  result.column_id_to_data_blocks_map =
437  }
438  return result;
439 }
445  const std::string& row,
446  const import_export::CopyParams& copy_params,
447  size_t num_cols,
448  int point_cols,
449  const std::string& file_name) const {
450  bool is_array = false;
451  bool try_single_thread = false;
452  std::vector<std::unique_ptr<char[]>> tmp_buffers;
453  std::vector<std::string_view> fields;
454  // parse columns in row into fields (other return values are intentionally ignored)
456  row.c_str() + row.size(),
457  row.c_str() + row.size(),
458  copy_params,
459  &is_array,
460  fields,
461  tmp_buffers,
462  try_single_thread,
463  false // Don't filter empty lines
464  );
465  // Check we have right number of columns
466  validate_expected_column_count(fields, num_cols, point_cols, file_name);
467 }
468 
470  const ForeignTable* foreign_table) const {
471  import_export::CopyParams copy_params{};
472  copy_params.plain_text = true;
473  if (const auto& value =
474  validate_and_get_string_with_length(foreign_table, "ARRAY_DELIMITER", 1);
475  !value.empty()) {
476  copy_params.array_delim = value[0];
477  }
478  if (const auto& value =
479  validate_and_get_string_with_length(foreign_table, "ARRAY_MARKER", 2);
480  !value.empty()) {
481  copy_params.array_begin = value[0];
482  copy_params.array_end = value[1];
483  }
484  if (auto it = foreign_table->options.find("BUFFER_SIZE");
485  it != foreign_table->options.end()) {
486  copy_params.buffer_size = std::stoi(it->second);
487  }
488  if (const auto& value = validate_and_get_delimiter(foreign_table, "DELIMITER");
489  !value.empty()) {
490  copy_params.delimiter = value[0];
491  }
492  if (const auto& value = validate_and_get_string_with_length(foreign_table, "ESCAPE", 1);
493  !value.empty()) {
494  copy_params.escape = value[0];
495  }
496  auto has_header = validate_and_get_bool_value(foreign_table, "HEADER");
497  if (has_header.has_value()) {
498  if (has_header.value()) {
499  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
500  } else {
501  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
502  }
503  }
504  if (const auto& value = validate_and_get_delimiter(foreign_table, "LINE_DELIMITER");
505  !value.empty()) {
506  copy_params.line_delim = value[0];
507  }
508  copy_params.lonlat =
509  validate_and_get_bool_value(foreign_table, "LONLAT").value_or(copy_params.lonlat);
510 
511  if (auto it = foreign_table->options.find("NULLS");
512  it != foreign_table->options.end()) {
513  copy_params.null_str = it->second;
514  }
515  if (const auto& value = validate_and_get_string_with_length(foreign_table, "QUOTE", 1);
516  !value.empty()) {
517  copy_params.quote = value[0];
518  }
519  copy_params.quoted =
520  validate_and_get_bool_value(foreign_table, "QUOTED").value_or(copy_params.quoted);
521  return copy_params;
522 }
523 
525  size_t& alloc_size,
526  std::unique_ptr<char[]>& buffer,
527  size_t& buffer_size,
528  const import_export::CopyParams& copy_params,
529  const size_t buffer_first_row_index,
530  unsigned int& num_rows_in_buffer,
531  foreign_storage::FileReader* file_reader) const {
533  buffer,
534  buffer_size,
535  copy_params,
536  buffer_first_row_index,
537  num_rows_in_buffer,
538  nullptr,
539  file_reader);
540 }
541 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
SQLTypes
Definition: sqltypes.h:38
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1144
std::string to_string(char const *&&v)
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
future< Result > async(Fn &&fn, Args &&...args)
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
Definition: Importer.cpp:1639
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:937
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
#define IS_STRING(T)
Definition: sqltypes.h:250
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:209
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
SQLTypeInfo columnType
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
bool is_string() const
Definition: sqltypes.h:504
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int8_t * numbersPtr
Definition: sqltypes.h:226
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:251