OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvFileBufferParser.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
21 #include "Geospatial/Types.h"
23 #include "Shared/misc.h"
24 
25 namespace foreign_storage {
26 namespace csv_file_buffer_parser {
27 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
28 
30  std::unique_ptr<bool[]>& array_flags,
31  int& phys_cols,
32  int& point_cols,
33  const std::list<const ColumnDescriptor*>& columns) {
34  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
35  size_t i = 0;
36  for (const auto cd : columns) {
37  const auto& col_ti = cd->columnType;
38  phys_cols += col_ti.get_physical_cols();
39  if (cd->columnType.get_type() == kPOINT) {
40  point_cols++;
41  }
42 
43  if (cd->columnType.get_type() == kARRAY) {
44  array_flags.get()[i] = true;
45  } else {
46  array_flags.get()[i] = false;
47  }
48  i++;
49  }
50 }
51 
52 void validate_expected_column_count(std::vector<std::string_view>& row,
53  size_t num_cols,
54  int point_cols,
55  const std::string& file_name) {
56  // Each POINT could consume two separate coords instead of a single WKT
57  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
58  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
59  }
60 }
61 
62 bool is_coordinate_scalar(const std::string_view datum) {
63  // field looks like a scalar numeric value (and not a hex blob)
64  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
65  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
66 }
67 
68 bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str,
69  const std::string_view lat_str,
70  std::vector<double>& coords,
71  const bool is_lon_lat_order) {
72  double lon = std::atof(std::string(lon_str).c_str());
73  double lat = NAN;
74 
75  if (is_coordinate_scalar(lat_str)) {
76  lat = std::atof(std::string(lat_str).c_str());
77  }
78 
79  // Swap coordinates if this table uses a reverse order: lat/lon
80  if (!is_lon_lat_order) {
81  std::swap(lat, lon);
82  }
83 
84  // TODO: should check if POINT column should have been declared with
85  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
86  // throw std::runtime_error("POINT column " + cd->columnName + " is
87  // not WGS84, cannot insert lon/lat");
88  // }
89 
90  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
91  return false;
92  }
93  coords.push_back(lon);
94  coords.push_back(lat);
95  return true;
96 }
97 
98 bool is_null_datum(const std::string_view datum,
99  const ColumnDescriptor* column,
100  const std::string& null_indicator) {
101  bool is_null = (datum == null_indicator);
102 
103  // Treating empty as NULL
104  if (!column->columnType.is_string() && datum.empty()) {
105  is_null = true;
106  }
107 
108  if (is_null && column->columnType.get_notnull()) {
109  throw std::runtime_error("NULL value provided for column (" + column->columnName +
110  ") with NOT NULL constraint.");
111  }
112  return is_null;
113 }
114 
116  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
117  size_t& col_idx,
118  const import_export::CopyParams& copy_params,
119  std::list<const ColumnDescriptor*>::iterator& cd_it,
120  std::vector<std::string_view>& row,
121  size_t& import_idx,
122  bool is_null,
123  size_t first_row_index,
124  size_t row_index_plus_one,
125  std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
126  auto cd = *cd_it;
127  auto col_ti = cd->columnType;
128  SQLTypes col_type = col_ti.get_type();
129  CHECK(IS_GEO(col_type));
130 
131  // store null string in the base column
132  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
133 
134  auto const& geo_string = row[import_idx];
135  ++import_idx;
136  ++col_idx;
137 
138  std::vector<double> coords;
139  std::vector<double> bounds;
140  std::vector<int> ring_sizes;
141  std::vector<int> poly_rings;
142  int render_group = 0;
143 
144  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
146  geo_string, row[import_idx], coords, copy_params.lonlat)) {
147  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
148  cd->columnName);
149  }
150  ++import_idx;
151  } else {
152  SQLTypeInfo import_ti{col_ti};
153  if (is_null) {
155  coords,
156  bounds,
157  ring_sizes,
158  poly_rings,
160  } else {
161  // extract geometry directly from WKT
162  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
163  import_ti,
164  coords,
165  bounds,
166  ring_sizes,
167  poly_rings,
169  std::string msg = "Failed to extract valid geometry from row " +
170  std::to_string(first_row_index + row_index_plus_one) +
171  " for column " + cd->columnName;
172  throw std::runtime_error(msg);
173  }
174 
175  // validate types
176  if (col_type != import_ti.get_type()) {
178  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
179  col_type == SQLTypes::kMULTIPOLYGON)) {
180  throw std::runtime_error("Imported geometry doesn't match the type of column " +
181  cd->columnName);
182  }
183  }
184  }
185  }
186 
187  // import extracted geo
189  cd,
190  import_buffers,
191  col_idx,
192  coords,
193  bounds,
194  ring_sizes,
195  poly_rings,
196  render_group);
197 }
198 
199 std::map<int, DataBlockPtr> convert_import_buffers_to_data_blocks(
200  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
201  import_buffers) {
202  std::map<int, DataBlockPtr> result;
203  std::vector<std::pair<const size_t, std::future<int8_t*>>>
204  encoded_data_block_ptrs_futures;
205  // make all async calls to string dictionary here and then continue execution
206  for (const auto& import_buffer : import_buffers) {
207  if (import_buffer == nullptr)
208  continue;
209  DataBlockPtr p;
210  if (import_buffer->getTypeInfo().is_number() ||
211  import_buffer->getTypeInfo().is_time() ||
212  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
213  p.numbersPtr = import_buffer->getAsBytes();
214  } else if (import_buffer->getTypeInfo().is_string()) {
215  auto string_payload_ptr = import_buffer->getStringBuffer();
216  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
217  p.stringsPtr = string_payload_ptr;
218  } else {
219  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
220  p.numbersPtr = nullptr;
221 
222  auto column_id = import_buffer->getColumnDesc()->columnId;
223  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
224  column_id,
225  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
226  import_buffer->addDictEncodedString(*string_payload_ptr);
227  return import_buffer->getStringDictBuffer();
228  })));
229  }
230  } else if (import_buffer->getTypeInfo().is_geometry()) {
231  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
232  p.stringsPtr = geo_payload_ptr;
233  } else {
234  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
235  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
236  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
237  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
238  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
239  } else {
240  p.arraysPtr = import_buffer->getArrayBuffer();
241  }
242  }
243  result[import_buffer->getColumnDesc()->columnId] = p;
244  }
245 
246  // wait for the async requests we made for string dictionary
247  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
248  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
249  }
250  return result;
251 }
252 
254  ParseBufferRequest(const ParseBufferRequest& request) = delete;
255  ParseBufferRequest(ParseBufferRequest&& request) = default;
258  int db_id,
259  const ForeignTable* foreign_table,
260  std::set<int> column_filter_set)
261  : buffer(std::make_unique<char[]>(buffer_size))
262  , buffer_size(buffer_size)
263  , buffer_alloc_size(buffer_size)
264  , copy_params(copy_params)
265  , db_id(db_id)
266  , foreign_table_schema(std::make_unique<ForeignTableSchema>(db_id, foreign_table)) {
267  // initialize import buffers from columns.
268  for (const auto column : getColumns()) {
269  if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
270  import_buffers.emplace_back(nullptr);
271  } else {
272  StringDictionary* string_dictionary = nullptr;
273  if (column->columnType.is_dict_encoded_string() ||
274  (column->columnType.is_array() &&
275  IS_STRING(column->columnType.get_subtype()) &&
276  column->columnType.get_compression() == kENCODING_DICT)) {
277  auto dict_descriptor = getCatalog()->getMetadataForDictUnlocked(
278  column->columnType.get_comp_param(), true);
279  string_dictionary = dict_descriptor->stringDict.get();
280  }
281  import_buffers.emplace_back(std::make_unique<import_export::TypedImportBuffer>(
282  column, string_dictionary));
283  }
284  }
285  }
286 
287  inline std::shared_ptr<Catalog_Namespace::Catalog> getCatalog() const {
289  }
290 
291  inline std::list<const ColumnDescriptor*> getColumns() const {
292  return foreign_table_schema->getLogicalAndPhysicalColumns();
293  }
294 
295  inline int32_t getTableId() const {
296  return foreign_table_schema->getForeignTable()->tableId;
297  }
298 
299  inline std::string getTableName() const {
300  return foreign_table_schema->getForeignTable()->tableName;
301  }
302 
303  inline size_t getMaxFragRows() const {
304  return foreign_table_schema->getForeignTable()->maxFragRows;
305  }
306 
307  inline std::string getFilePath() const {
308  return foreign_table_schema->getForeignTable()->getFullFilePath();
309  }
310 
311  // These must be initialized at construction (before parsing).
312  std::unique_ptr<char[]> buffer;
313  size_t buffer_size;
316  const int db_id;
317  std::unique_ptr<ForeignTableSchema> foreign_table_schema;
318  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
319 
320  // These are set during parsing.
322  size_t begin_pos;
323  size_t end_pos;
325  size_t file_offset;
327 };
328 
330  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
331  size_t row_count;
332  std::vector<size_t> row_offsets;
333 };
334 
335 bool skip_column_import(ParseBufferRequest& request, int column_idx) {
336  return request.import_buffers[column_idx] == nullptr;
337 }
338 
343 ParseBufferResult parse_buffer(ParseBufferRequest& request, bool convert_data_blocks) {
344  CHECK(request.buffer);
346  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
347  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
348  const char* thread_buf_end = request.buffer.get() + request.end_pos;
349  const char* buf_end = request.buffer.get() + request.buffer_size;
350 
351  std::vector<std::string_view> row;
352  size_t row_index_plus_one = 0;
353  const char* p = thread_buf;
354  bool try_single_thread = false;
355  int phys_cols = 0;
356  int point_cols = 0;
357  std::unique_ptr<bool[]> array_flags;
358 
360  array_flags, phys_cols, point_cols, request.getColumns());
361  auto num_cols = request.getColumns().size() - phys_cols;
362 
363  size_t row_count = 0;
364  size_t remaining_row_count = request.process_row_count;
365  std::vector<size_t> row_offsets{};
366  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
367 
368  std::string file_path = request.getFilePath();
369  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
370  row.clear();
371  row_count++;
372  std::vector<std::unique_ptr<char[]>>
373  tmp_buffers; // holds string w/ removed escape chars, etc
374 
376  thread_buf_end,
377  buf_end,
378  request.copy_params,
379  array_flags.get(),
380  row,
381  tmp_buffers,
382  try_single_thread);
383 
384  row_index_plus_one++;
385  validate_expected_column_count(row, num_cols, point_cols, file_path);
386 
387  size_t import_idx = 0;
388  size_t col_idx = 0;
389  try {
390  auto columns = request.getColumns();
391  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
392  auto cd = *cd_it;
393  const auto& col_ti = cd->columnType;
394  bool is_null = is_null_datum(row[import_idx], cd, request.copy_params.null_str);
395 
396  if (col_ti.is_geometry()) {
397  if (!skip_column_import(request, col_idx)) {
399  col_idx,
400  request.copy_params,
401  cd_it,
402  row,
403  import_idx,
404  is_null,
405  request.first_row_index,
406  row_index_plus_one,
407  request.getCatalog());
408  } else {
409  // update import/col idx according to types
410  if (!is_null && cd->columnType == kPOINT &&
411  is_coordinate_scalar(row[import_idx])) {
412  ++import_idx;
413  }
414  ++import_idx;
415  ++col_idx;
416  col_idx += col_ti.get_physical_cols();
417  }
418  // skip remaining physical columns
419  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
420  ++cd_it;
421  }
422  } else {
423  if (!skip_column_import(request, col_idx)) {
424  request.import_buffers[col_idx]->add_value(
425  cd, row[import_idx], is_null, request.copy_params);
426  }
427  ++import_idx;
428  ++col_idx;
429  }
430  }
431  } catch (const std::exception& e) {
432  // TODO: Appropriate error handling for FSI
433  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
434  request.import_buffers[col_idx_to_pop]->pop_value();
435  }
436  LOG(ERROR) << "Input exception thrown: " << e.what()
437  << ". Row discarded. Data: " << shared::printContainer(row);
438  }
439  }
440  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
441 
443  result.row_offsets = row_offsets;
444  result.row_count = row_count;
445  if (convert_data_blocks) {
446  result.column_id_to_data_blocks_map =
448  }
449  return result;
450 }
451 
452 } // namespace csv_file_buffer_parser
453 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
SQLTypes
Definition: sqltypes.h:37
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
bool is_coordinate_scalar(const std::string_view datum)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
std::list< const ColumnDescriptor * > getColumns() const
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
#define LOG(tag)
Definition: Logger.h:188
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1114
std::string to_string(char const *&&v)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1433
static SysCatalog & instance()
Definition: SysCatalog.h:288
CONSTEXPR DEVICE bool is_null(const T &value)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
ParseBufferRequest(size_t buffer_size, const import_export::CopyParams &copy_params, int db_id, const ForeignTable *foreign_table, std::set< int > column_filter_set)
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
specifies the content in-memory of a row in the column metadata table
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:907
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
#define IS_STRING(T)
Definition: sqltypes.h:241
ParseBufferRequest(const ParseBufferRequest &request)=delete
#define CHECK(condition)
Definition: Logger.h:197
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
bool is_string() const
Definition: sqltypes.h:478
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
int8_t * numbersPtr
Definition: sqltypes.h:217
std::string columnName
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
#define IS_GEO(T)
Definition: sqltypes.h:242
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.