OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 #include "ImportExport/Importer.h"
20 #include "Shared/StringTransform.h"
21 
22 namespace foreign_storage {
23 namespace csv_file_buffer_parser {
24 
26  std::unique_ptr<bool[]>& array_flags,
27  int& phys_cols,
28  int& point_cols,
29  const std::list<const ColumnDescriptor*>& columns) {
30  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
31  size_t i = 0;
32  for (const auto cd : columns) {
33  const auto& col_ti = cd->columnType;
34  phys_cols += col_ti.get_physical_cols();
35  if (cd->columnType.get_type() == kPOINT) {
36  point_cols++;
37  }
38 
39  if (cd->columnType.get_type() == kARRAY) {
40  array_flags.get()[i] = true;
41  } else {
42  array_flags.get()[i] = false;
43  }
44  i++;
45  }
46 }
47 
48 void validate_expected_column_count(std::vector<std::string_view>& row,
49  size_t num_cols,
50  int point_cols,
51  const std::string& file_name) {
52  // Each POINT could consume two separate coords instead of a single WKT
53  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
54  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
55  }
56 }
57 
58 bool is_coordinate_scalar(const std::string_view datum) {
59  // field looks like a scalar numeric value (and not a hex blob)
60  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
61  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
62 }
63 
64 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
65  const std::string_view lat_str,
66  std::vector<double>& coords,
67  const bool is_lon_lat_order) {
68  double lon = std::atof(std::string(lon_str).c_str());
69  double lat = NAN;
70 
71  if (is_coordinate_scalar(lat_str)) {
72  lat = std::atof(std::string(lat_str).c_str());
73  }
74 
75  // Swap coordinates if this table uses a reverse order: lat/lon
76  if (!is_lon_lat_order) {
77  std::swap(lat, lon);
78  }
79 
80  // TODO: should check if POINT column should have been declared with
81  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
82  // throw std::runtime_error("POINT column " + cd->columnName + " is
83  // not WGS84, cannot insert lon/lat");
84  // }
85 
86  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
87  return false;
88  }
89  coords.push_back(lon);
90  coords.push_back(lat);
91  return true;
92 }
93 
94 bool is_null_datum(const std::string_view datum,
95  const ColumnDescriptor* column,
96  const std::string& null_indicator) {
97  bool is_null = (datum == null_indicator);
98 
99  // Treating empty as NULL
100  if (!column->columnType.is_string() && datum.empty()) {
101  is_null = true;
102  }
103 
104  if (is_null && column->columnType.get_notnull()) {
105  throw std::runtime_error("NULL value provided for column (" + column->columnName +
106  ") with NOT NULL constraint.");
107  }
108  return is_null;
109 }
110 
112  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
113  size_t& col_idx,
114  const import_export::CopyParams& copy_params,
115  std::list<const ColumnDescriptor*>::iterator& cd_it,
116  std::vector<std::string_view>& row,
117  size_t& import_idx,
118  bool is_null,
119  size_t first_row_index,
120  size_t row_index_plus_one,
121  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
122  auto cd = *cd_it;
123  auto col_ti = cd->columnType;
124  SQLTypes col_type = col_ti.get_type();
125  CHECK(IS_GEO(col_type));
126 
127  // store null string in the base column
128  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
129 
130  auto const& geo_string = row[import_idx];
131  ++import_idx;
132  ++col_idx;
133 
134  std::vector<double> coords;
135  std::vector<double> bounds;
136  std::vector<int> ring_sizes;
137  std::vector<int> poly_rings;
138  int render_group = 0;
139 
140  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
142  geo_string, row[import_idx], coords, copy_params.lonlat)) {
143  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
144  cd->columnName);
145  }
146  ++import_idx;
147  } else {
148  SQLTypeInfo import_ti{col_ti};
149  if (is_null) {
151  coords,
152  bounds,
153  ring_sizes,
154  poly_rings,
156  } else {
157  // extract geometry directly from WKT
158  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
159  import_ti,
160  coords,
161  bounds,
162  ring_sizes,
163  poly_rings,
165  std::string msg = "Failed to extract valid geometry from row " +
166  std::to_string(first_row_index + row_index_plus_one) +
167  " for column " + cd->columnName;
168  throw std::runtime_error(msg);
169  }
170 
171  // validate types
172  if (col_type != import_ti.get_type()) {
174  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
175  col_type == SQLTypes::kMULTIPOLYGON)) {
176  throw std::runtime_error("Imported geometry doesn't match the type of column " +
177  cd->columnName);
178  }
179  }
180  }
181  }
182 
183  // import extracted geo
185  cd,
186  import_buffers,
187  col_idx,
188  coords,
189  bounds,
190  ring_sizes,
191  poly_rings,
192  render_group);
193 }
194 
195 std::map<int, DataBlockPtr> convert_import_buffers_to_data_blocks(
196  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
197  import_buffers) {
198  std::map<int, DataBlockPtr> result;
199  std::vector<std::pair<const size_t, std::future<int8_t*>>>
200  encoded_data_block_ptrs_futures;
201  // make all async calls to string dictionary here and then continue execution
202  for (const auto& import_buffer : import_buffers) {
203  if (import_buffer == nullptr)
204  continue;
205  DataBlockPtr p;
206  if (import_buffer->getTypeInfo().is_number() ||
207  import_buffer->getTypeInfo().is_time() ||
208  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
209  p.numbersPtr = import_buffer->getAsBytes();
210  } else if (import_buffer->getTypeInfo().is_string()) {
211  auto string_payload_ptr = import_buffer->getStringBuffer();
212  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
213  p.stringsPtr = string_payload_ptr;
214  } else {
215  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
216  p.numbersPtr = nullptr;
217 
218  auto column_id = import_buffer->getColumnDesc()->columnId;
219  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
220  column_id,
221  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
222  import_buffer->addDictEncodedString(*string_payload_ptr);
223  return import_buffer->getStringDictBuffer();
224  })));
225  }
226  } else if (import_buffer->getTypeInfo().is_geometry()) {
227  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
228  p.stringsPtr = geo_payload_ptr;
229  } else {
230  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
231  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
232  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
233  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
234  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
235  } else {
236  p.arraysPtr = import_buffer->getArrayBuffer();
237  }
238  }
239  result[import_buffer->getColumnDesc()->columnId] = p;
240  }
241 
242  // wait for the async requests we made for string dictionary
243  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
244  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
245  }
246  return result;
247 }
248 
250  const import_export::CopyParams& copy_params,
251  int db_id,
252  const ForeignTable* foreign_table,
253  std::set<int> column_filter_set,
254  const std::string& full_path)
255  : buffer_size(buffer_size)
256  , buffer_alloc_size(buffer_size)
257  , copy_params(copy_params)
258  , db_id(db_id)
259  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
260  , full_path(full_path) {
261  if (buffer_size > 0) {
262  buffer = std::make_unique<char[]>(buffer_size);
263  }
264  // initialize import buffers from columns.
265  for (const auto column : getColumns()) {
266  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
267  import_buffers.emplace_back(nullptr);
268  } else {
269  StringDictionary* string_dictionary = nullptr;
270  if (column->columnType.is_dict_encoded_string() ||
271  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
272  column->columnType.get_compression() == kENCODING_DICT)) {
273  auto dict_descriptor = getCatalog()->getMetadataForDictUnlocked(
274  column->columnType.get_comp_param(), true);
275  string_dictionary = dict_descriptor->stringDict.get();
276  }
277  import_buffers.emplace_back(
278  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
279  }
280  }
281 }
282 
288  bool convert_data_blocks,
289  bool columns_are_pre_filtered) {
290  CHECK(request.buffer);
292  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
293  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
294  const char* thread_buf_end = request.buffer.get() + request.end_pos;
295  const char* buf_end = request.buffer.get() + request.buffer_size;
296 
297  std::vector<std::string_view> row;
298  size_t row_index_plus_one = 0;
299  const char* p = thread_buf;
300  bool try_single_thread = false;
301  int phys_cols = 0;
302  int point_cols = 0;
303  std::unique_ptr<bool[]> array_flags;
304 
306  array_flags, phys_cols, point_cols, request.getColumns());
307  auto num_cols = request.getColumns().size() - phys_cols;
308  if (columns_are_pre_filtered) {
309  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
310  if (skip_column_import(request, col_idx)) {
311  --num_cols;
312  }
313  }
314  }
315 
316  size_t row_count = 0;
317  size_t remaining_row_count = request.process_row_count;
318  std::vector<size_t> row_offsets{};
319  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
320 
321  std::string file_path = request.getFilePath();
322  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
323  row.clear();
324  row_count++;
325  std::vector<std::unique_ptr<char[]>>
326  tmp_buffers; // holds string w/ removed escape chars, etc
327  const char* line_start = p;
329  thread_buf_end,
330  buf_end,
331  request.copy_params,
332  array_flags.get(),
333  row,
334  tmp_buffers,
335  try_single_thread,
336  !columns_are_pre_filtered);
337 
338  row_index_plus_one++;
339  validate_expected_column_count(row, num_cols, point_cols, file_path);
340 
341  size_t import_idx = 0;
342  size_t col_idx = 0;
343  try {
344  auto columns = request.getColumns();
345  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
346  auto cd = *cd_it;
347  const auto& col_ti = cd->columnType;
348  bool column_is_present =
349  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
350  CHECK(row.size() > import_idx || !column_is_present);
351  bool is_null =
352  column_is_present
353  ? is_null_datum(row[import_idx], cd, request.copy_params.null_str)
354  : true;
355 
356  if (col_ti.is_geometry()) {
357  if (!skip_column_import(request, col_idx)) {
359  col_idx,
360  request.copy_params,
361  cd_it,
362  row,
363  import_idx,
364  is_null,
365  request.first_row_index,
366  row_index_plus_one,
367  request.getCatalog());
368  } else {
369  // update import/col idx according to types
370  if (!is_null && cd->columnType == kPOINT &&
371  is_coordinate_scalar(row[import_idx])) {
372  if (!columns_are_pre_filtered)
373  ++import_idx;
374  }
375  if (!columns_are_pre_filtered)
376  ++import_idx;
377  ++col_idx;
378  col_idx += col_ti.get_physical_cols();
379  }
380  // skip remaining physical columns
381  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
382  ++cd_it;
383  }
384  } else {
385  if (!skip_column_import(request, col_idx)) {
386  request.import_buffers[col_idx]->add_value(
387  cd, row[import_idx], is_null, request.copy_params);
388  }
389  if (column_is_present) {
390  ++import_idx;
391  }
392  ++col_idx;
393  }
394  }
395  } catch (const std::exception& e) {
396  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
397  "\" in row \"" + std::string(line_start, p) +
398  "\" in file \"" + file_path + "\"");
399  }
400  }
401  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
402 
404  result.row_offsets = row_offsets;
405  result.row_count = row_count;
406  if (convert_data_blocks) {
407  result.column_id_to_data_blocks_map =
409  }
410  return result;
411 }
417  const std::string& row,
418  const import_export::CopyParams& copy_params,
419  size_t num_cols,
420  int point_cols,
421  const std::string& file_name) {
422  bool is_array = false;
423  bool try_single_thread = false;
424  std::vector<std::unique_ptr<char[]>> tmp_buffers;
425  std::vector<std::string_view> fields;
426  // parse columns in row into fields (other return values are intentionally ignored)
428  row.c_str() + row.size(),
429  row.c_str() + row.size(),
430  copy_params,
431  &is_array,
432  fields,
433  tmp_buffers,
434  try_single_thread,
435  false // Don't filter empty lines
436  );
437  // Check we have right number of columns
438  validate_expected_column_count(fields, num_cols, point_cols, file_name);
439 }
440 
441 } // namespace csv_file_buffer_parser
442 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
SQLTypes
Definition: sqltypes.h:37
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
bool is_coordinate_scalar(const std::string_view datum)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
std::list< const ColumnDescriptor * > getColumns() const
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1114
std::string to_string(char const *&&v)
CONSTEXPR DEVICE bool is_null(const T &value)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
void parse_and_validate_expected_column_count(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
Definition: Importer.cpp:1459
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered)
specifies the content in-memory of a row in the column metadata table
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:907
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
#define IS_STRING(T)
Definition: sqltypes.h:244
ParseBufferRequest(const ParseBufferRequest &request)=delete
#define CHECK(condition)
Definition: Logger.h:197
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:488
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321
int8_t * numbersPtr
Definition: sqltypes.h:220
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:245