OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::csv_file_buffer_parser Namespace Reference

Classes

struct  ParseBufferRequest
 
struct  ParseBufferResult
 

Functions

void set_array_flags_and_geo_columns_count (std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
 
void validate_expected_column_count (std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
 
bool is_coordinate_scalar (const std::string_view datum)
 
bool set_coordinates_from_separate_lon_lat_columns (const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
 
bool is_null_datum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 
void process_geo_column (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
std::map< int, DataBlockPtrconvert_import_buffers_to_data_blocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
bool skip_column_import (ParseBufferRequest &request, int column_idx)
 
ParseBufferResult parse_buffer (ParseBufferRequest &request, bool convert_data_blocks)
 

Variables

static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true
 

Function Documentation

std::map<int, DataBlockPtr> foreign_storage::csv_file_buffer_parser::convert_import_buffers_to_data_blocks ( const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers)

Definition at line 199 of file CsvFileBufferParser.h.

References DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by parse_buffer().

201  {
202  std::map<int, DataBlockPtr> result;
203  std::vector<std::pair<const size_t, std::future<int8_t*>>>
204  encoded_data_block_ptrs_futures;
205  // make all async calls to string dictionary here and then continue execution
206  for (const auto& import_buffer : import_buffers) {
207  if (import_buffer == nullptr)
208  continue;
209  DataBlockPtr p;
210  if (import_buffer->getTypeInfo().is_number() ||
211  import_buffer->getTypeInfo().is_time() ||
212  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
213  p.numbersPtr = import_buffer->getAsBytes();
214  } else if (import_buffer->getTypeInfo().is_string()) {
215  auto string_payload_ptr = import_buffer->getStringBuffer();
216  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
217  p.stringsPtr = string_payload_ptr;
218  } else {
219  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
220  p.numbersPtr = nullptr;
221 
222  auto column_id = import_buffer->getColumnDesc()->columnId;
223  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
224  column_id,
225  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
226  import_buffer->addDictEncodedString(*string_payload_ptr);
227  return import_buffer->getStringDictBuffer();
228  })));
229  }
230  } else if (import_buffer->getTypeInfo().is_geometry()) {
231  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
232  p.stringsPtr = geo_payload_ptr;
233  } else {
234  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
235  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
236  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
237  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
238  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
239  } else {
240  p.arraysPtr = import_buffer->getArrayBuffer();
241  }
242  }
243  result[import_buffer->getColumnDesc()->columnId] = p;
244  }
245 
246  // wait for the async requests we made for string dictionary
247  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
248  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
249  }
250  return result;
251 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
#define IS_STRING(T)
Definition: sqltypes.h:241
#define CHECK(condition)
Definition: Logger.h:197
int8_t * numbersPtr
Definition: sqltypes.h:217

+ Here is the caller graph for this function:

bool foreign_storage::csv_file_buffer_parser::is_coordinate_scalar ( const std::string_view  datum)

Definition at line 62 of file CsvFileBufferParser.h.

Referenced by parse_buffer(), process_geo_column(), and set_coordinates_from_separate_lon_lat_columns().

62  {
63  // field looks like a scalar numeric value (and not a hex blob)
64  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
65  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
66 }

+ Here is the caller graph for this function:

bool foreign_storage::csv_file_buffer_parser::is_null_datum ( const std::string_view  datum,
const ColumnDescriptor column,
const std::string &  null_indicator 
)

Definition at line 98 of file CsvFileBufferParser.h.

References ColumnDescriptor::columnName, ColumnDescriptor::columnType, SQLTypeInfo::get_notnull(), is_null(), and SQLTypeInfo::is_string().

Referenced by parse_buffer().

100  {
101  bool is_null = (datum == null_indicator);
102 
103  // Treating empty as NULL
104  if (!column->columnType.is_string() && datum.empty()) {
105  is_null = true;
106  }
107 
108  if (is_null && column->columnType.get_notnull()) {
109  throw std::runtime_error("NULL value provided for column (" + column->columnName +
110  ") with NOT NULL constraint.");
111  }
112  return is_null;
113 }
CONSTEXPR DEVICE bool is_null(const T &value)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:478
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
std::string columnName

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParseBufferResult foreign_storage::csv_file_buffer_parser::parse_buffer ( ParseBufferRequest &  request,
bool  convert_data_blocks 
)

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Definition at line 343 of file CsvFileBufferParser.h.

References foreign_storage::csv_file_buffer_parser::ParseBufferRequest::begin_pos, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_size, CHECK, convert_import_buffers_to_data_blocks(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::copy_params, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::end_pos, logger::ERROR, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::file_offset, import_export::delimited_parser::find_beginning(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::first_row_index, import_export::delimited_parser::get_row(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::getCatalog(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::getColumns(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::getFilePath(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers, is_coordinate_scalar(), is_null(), is_null_datum(), kPOINT, LOG, import_export::CopyParams::null_str, shared::printContainer(), process_geo_column(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::process_row_count, run_benchmark_import::result, set_array_flags_and_geo_columns_count(), skip_column_import(), and validate_expected_column_count().

Referenced by foreign_storage::parse_file_regions(), and foreign_storage::scan_metadata().

343  {
344  CHECK(request.buffer);
346  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
347  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
348  const char* thread_buf_end = request.buffer.get() + request.end_pos;
349  const char* buf_end = request.buffer.get() + request.buffer_size;
350 
351  std::vector<std::string_view> row;
352  size_t row_index_plus_one = 0;
353  const char* p = thread_buf;
354  bool try_single_thread = false;
355  int phys_cols = 0;
356  int point_cols = 0;
357  std::unique_ptr<bool[]> array_flags;
358 
360  array_flags, phys_cols, point_cols, request.getColumns());
361  auto num_cols = request.getColumns().size() - phys_cols;
362 
363  size_t row_count = 0;
364  size_t remaining_row_count = request.process_row_count;
365  std::vector<size_t> row_offsets{};
366  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
367 
368  std::string file_path = request.getFilePath();
369  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
370  row.clear();
371  row_count++;
372  std::vector<std::unique_ptr<char[]>>
373  tmp_buffers; // holds string w/ removed escape chars, etc
374 
376  thread_buf_end,
377  buf_end,
378  request.copy_params,
379  array_flags.get(),
380  row,
381  tmp_buffers,
382  try_single_thread);
383 
384  row_index_plus_one++;
385  validate_expected_column_count(row, num_cols, point_cols, file_path);
386 
387  size_t import_idx = 0;
388  size_t col_idx = 0;
389  try {
390  auto columns = request.getColumns();
391  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
392  auto cd = *cd_it;
393  const auto& col_ti = cd->columnType;
394  bool is_null = is_null_datum(row[import_idx], cd, request.copy_params.null_str);
395 
396  if (col_ti.is_geometry()) {
397  if (!skip_column_import(request, col_idx)) {
398  process_geo_column(request.import_buffers,
399  col_idx,
400  request.copy_params,
401  cd_it,
402  row,
403  import_idx,
404  is_null,
405  request.first_row_index,
406  row_index_plus_one,
407  request.getCatalog());
408  } else {
409  // update import/col idx according to types
410  if (!is_null && cd->columnType == kPOINT &&
411  is_coordinate_scalar(row[import_idx])) {
412  ++import_idx;
413  }
414  ++import_idx;
415  ++col_idx;
416  col_idx += col_ti.get_physical_cols();
417  }
418  // skip remaining physical columns
419  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
420  ++cd_it;
421  }
422  } else {
423  if (!skip_column_import(request, col_idx)) {
424  request.import_buffers[col_idx]->add_value(
425  cd, row[import_idx], is_null, request.copy_params);
426  }
427  ++import_idx;
428  ++col_idx;
429  }
430  }
431  } catch (const std::exception& e) {
432  // TODO: Appropriate error handling for FSI
433  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
434  request.import_buffers[col_idx_to_pop]->pop_value();
435  }
436  LOG(ERROR) << "Input exception thrown: " << e.what()
437  << ". Row discarded. Data: " << shared::printContainer(row);
438  }
439  }
440  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
441 
442  ParseBufferResult result{};
443  result.row_offsets = row_offsets;
444  result.row_count = row_count;
445  if (convert_data_blocks) {
446  result.column_id_to_data_blocks_map =
447  convert_import_buffers_to_data_blocks(request.import_buffers);
448  }
449  return result;
450 }
bool is_coordinate_scalar(const std::string_view datum)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
#define LOG(tag)
Definition: Logger.h:188
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
CONSTEXPR DEVICE bool is_null(const T &value)
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:197
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::csv_file_buffer_parser::process_geo_column ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
std::list< const ColumnDescriptor * >::iterator &  cd_it,
std::vector< std::string_view > &  row,
size_t &  import_idx,
bool  is_null,
size_t  first_row_index,
size_t  row_index_plus_one,
std::shared_ptr< Catalog_Namespace::Catalog catalog 
)

Definition at line 115 of file CsvFileBufferParser.h.

References CHECK, Geospatial::GeoTypesFactory::getGeoColumns(), Geospatial::GeoTypesFactory::getNullGeoColumns(), is_coordinate_scalar(), IS_GEO, kMULTIPOLYGON, kPOINT, kPOLYGON, import_export::CopyParams::lonlat, import_export::CopyParams::null_str, PROMOTE_POLYGON_TO_MULTIPOLYGON, set_coordinates_from_separate_lon_lat_columns(), import_export::Importer::set_geo_physical_import_buffer(), and to_string().

Referenced by parse_buffer().

125  {
126  auto cd = *cd_it;
127  auto col_ti = cd->columnType;
128  SQLTypes col_type = col_ti.get_type();
129  CHECK(IS_GEO(col_type));
130 
131  // store null string in the base column
132  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
133 
134  auto const& geo_string = row[import_idx];
135  ++import_idx;
136  ++col_idx;
137 
138  std::vector<double> coords;
139  std::vector<double> bounds;
140  std::vector<int> ring_sizes;
141  std::vector<int> poly_rings;
142  int render_group = 0;
143 
144  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
146  geo_string, row[import_idx], coords, copy_params.lonlat)) {
147  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
148  cd->columnName);
149  }
150  ++import_idx;
151  } else {
152  SQLTypeInfo import_ti{col_ti};
153  if (is_null) {
155  coords,
156  bounds,
157  ring_sizes,
158  poly_rings,
160  } else {
161  // extract geometry directly from WKT
162  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
163  import_ti,
164  coords,
165  bounds,
166  ring_sizes,
167  poly_rings,
169  std::string msg = "Failed to extract valid geometry from row " +
170  std::to_string(first_row_index + row_index_plus_one) +
171  " for column " + cd->columnName;
172  throw std::runtime_error(msg);
173  }
174 
175  // validate types
176  if (col_type != import_ti.get_type()) {
178  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
179  col_type == SQLTypes::kMULTIPOLYGON)) {
180  throw std::runtime_error("Imported geometry doesn't match the type of column " +
181  cd->columnName);
182  }
183  }
184  }
185  }
186 
187  // import extracted geo
189  cd,
190  import_buffers,
191  col_idx,
192  coords,
193  bounds,
194  ring_sizes,
195  poly_rings,
196  render_group);
197 }
SQLTypes
Definition: sqltypes.h:37
bool is_coordinate_scalar(const std::string_view datum)
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1114
std::string to_string(char const *&&v)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1433
CONSTEXPR DEVICE bool is_null(const T &value)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:907
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
#define CHECK(condition)
Definition: Logger.h:197
#define IS_GEO(T)
Definition: sqltypes.h:242

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::csv_file_buffer_parser::set_array_flags_and_geo_columns_count ( std::unique_ptr< bool[]> &  array_flags,
int &  phys_cols,
int &  point_cols,
const std::list< const ColumnDescriptor * > &  columns 
)

Definition at line 29 of file CsvFileBufferParser.h.

References kARRAY, and kPOINT.

Referenced by parse_buffer().

33  {
34  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
35  size_t i = 0;
36  for (const auto cd : columns) {
37  const auto& col_ti = cd->columnType;
38  phys_cols += col_ti.get_physical_cols();
39  if (cd->columnType.get_type() == kPOINT) {
40  point_cols++;
41  }
42 
43  if (cd->columnType.get_type() == kARRAY) {
44  array_flags.get()[i] = true;
45  } else {
46  array_flags.get()[i] = false;
47  }
48  i++;
49  }
50 }

+ Here is the caller graph for this function:

bool foreign_storage::csv_file_buffer_parser::set_coordinates_from_separate_lon_lat_columns ( const std::string_view  lon_str,
const std::string_view  lat_str,
std::vector< double > &  coords,
const bool  is_lon_lat_order 
)

Definition at line 68 of file CsvFileBufferParser.h.

References is_coordinate_scalar(), and gpu_enabled::swap().

Referenced by process_geo_column().

71  {
72  double lon = std::atof(std::string(lon_str).c_str());
73  double lat = NAN;
74 
75  if (is_coordinate_scalar(lat_str)) {
76  lat = std::atof(std::string(lat_str).c_str());
77  }
78 
79  // Swap coordinates if this table uses a reverse order: lat/lon
80  if (!is_lon_lat_order) {
81  std::swap(lat, lon);
82  }
83 
84  // TODO: should check if POINT column should have been declared with
85  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
86  // throw std::runtime_error("POINT column " + cd->columnName + " is
87  // not WGS84, cannot insert lon/lat");
88  // }
89 
90  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
91  return false;
92  }
93  coords.push_back(lon);
94  coords.push_back(lat);
95  return true;
96 }
bool is_coordinate_scalar(const std::string_view datum)
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::csv_file_buffer_parser::skip_column_import ( ParseBufferRequest &  request,
int  column_idx 
)

Definition at line 335 of file CsvFileBufferParser.h.

References foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers.

Referenced by parse_buffer().

335  {
336  return request.import_buffers[column_idx] == nullptr;
337 }

+ Here is the caller graph for this function:

void foreign_storage::csv_file_buffer_parser::validate_expected_column_count ( std::vector< std::string_view > &  row,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
)

Definition at line 52 of file CsvFileBufferParser.h.

References foreign_storage::throw_number_of_columns_mismatch_error().

Referenced by parse_buffer().

55  {
56  // Each POINT could consume two separate coords instead of a single WKT
57  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
58  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
59  }
60 }
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Variable Documentation

constexpr bool foreign_storage::csv_file_buffer_parser::PROMOTE_POLYGON_TO_MULTIPOLYGON = true
static

Definition at line 27 of file CsvFileBufferParser.h.

Referenced by process_geo_column().