OmniSciDB  2e3a973ef4
foreign_storage::csv_file_buffer_parser Namespace Reference

Classes

struct  ParseBufferRequest
 
struct  ParseBufferResult
 

Functions

void set_array_flags_and_geo_columns_count (std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor *> &columns)
 
void validate_expected_column_count (std::vector< std::string_view > &row, size_t num_cols, int point_cols)
 
bool is_coordinate_scalar (const std::string_view datum)
 
bool set_coordinates_from_separate_lon_lat_columns (const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
 
bool is_null_datum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 
void process_geo_column (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor *>::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
std::map< int, DataBlockPtrconvert_import_buffers_to_data_blocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
bool skip_column_import (ParseBufferRequest &request, int column_idx)
 
ParseBufferResult parse_buffer (ParseBufferRequest &request)
 

Variables

static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true
 

Function Documentation

◆ convert_import_buffers_to_data_blocks()

std::map<int, DataBlockPtr> foreign_storage::csv_file_buffer_parser::convert_import_buffers_to_data_blocks ( const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers)

Definition at line 185 of file CsvFileBufferParser.h.

References DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by parse_buffer().

187  {
188  std::map<int, DataBlockPtr> result;
189  std::vector<std::pair<const size_t, std::future<int8_t*>>>
190  encoded_data_block_ptrs_futures;
191  // make all async calls to string dictionary here and then continue execution
192  for (const auto& import_buffer : import_buffers) {
193  if (import_buffer == nullptr)
194  continue;
195  DataBlockPtr p;
196  if (import_buffer->getTypeInfo().is_number() ||
197  import_buffer->getTypeInfo().is_time() ||
198  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
199  p.numbersPtr = import_buffer->getAsBytes();
200  } else if (import_buffer->getTypeInfo().is_string()) {
201  auto string_payload_ptr = import_buffer->getStringBuffer();
202  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
203  p.stringsPtr = string_payload_ptr;
204  } else {
205  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
206  p.numbersPtr = nullptr;
207 
208  auto column_id = import_buffer->getColumnDesc()->columnId;
209  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
210  column_id,
211  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
212  import_buffer->addDictEncodedString(*string_payload_ptr);
213  return import_buffer->getStringDictBuffer();
214  })));
215  }
216  } else if (import_buffer->getTypeInfo().is_geometry()) {
217  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
218  p.stringsPtr = geo_payload_ptr;
219  } else {
220  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
221  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
222  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
223  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
224  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
225  } else {
226  p.arraysPtr = import_buffer->getArrayBuffer();
227  }
228  }
229  result[import_buffer->getColumnDesc()->columnId] = p;
230  }
231 
232  // wait for the async requests we made for string dictionary
233  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
234  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
235  }
236  return result;
237 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
#define IS_STRING(T)
Definition: sqltypes.h:173
#define CHECK(condition)
Definition: Logger.h:197
int8_t * numbersPtr
Definition: sqltypes.h:149
+ Here is the caller graph for this function:

◆ is_coordinate_scalar()

bool foreign_storage::csv_file_buffer_parser::is_coordinate_scalar ( const std::string_view  datum)

Definition at line 48 of file CsvFileBufferParser.h.

Referenced by parse_buffer(), process_geo_column(), and set_coordinates_from_separate_lon_lat_columns().

48  {
49  // field looks like a scalar numeric value (and not a hex blob)
50  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
51  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
52 }
+ Here is the caller graph for this function:

◆ is_null_datum()

bool foreign_storage::csv_file_buffer_parser::is_null_datum ( const std::string_view  datum,
const ColumnDescriptor column,
const std::string &  null_indicator 
)

Definition at line 84 of file CsvFileBufferParser.h.

References ColumnDescriptor::columnName, ColumnDescriptor::columnType, SQLTypeInfo::get_notnull(), anonymous_namespace{TypedDataAccessors.h}::is_null(), and SQLTypeInfo::is_string().

Referenced by parse_buffer().

86  {
87  bool is_null = (datum == null_indicator);
88 
89  // Treating empty as NULL
90  if (!column->columnType.is_string() && datum.empty()) {
91  is_null = true;
92  }
93 
94  if (is_null && column->columnType.get_notnull()) {
95  throw std::runtime_error("NULL value provided for column (" + column->columnName +
96  ") with NOT NULL constraint.");
97  }
98  return is_null;
99 }
bool is_string() const
Definition: sqltypes.h:417
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
bool is_null(const T &v, const SQLTypeInfo &t)
SQLTypeInfo columnType
std::string columnName
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ parse_buffer()

ParseBufferResult foreign_storage::csv_file_buffer_parser::parse_buffer ( ParseBufferRequest request)

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Definition at line 276 of file CsvFileBufferParser.h.

References foreign_storage::csv_file_buffer_parser::ParseBufferRequest::begin_pos, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_size, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::catalog, CHECK, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::columns, convert_import_buffers_to_data_blocks(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::copy_params, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::end_pos, logger::ERROR, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::file_offset, import_export::delimited_parser::find_beginning(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::first_row_index, import_export::delimited_parser::get_row(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers, is_coordinate_scalar(), anonymous_namespace{TypedDataAccessors.h}::is_null(), is_null_datum(), kPOINT, LOG, import_export::CopyParams::null_str, shared::printContainer(), process_geo_column(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::process_row_count, run_benchmark_import::result, set_array_flags_and_geo_columns_count(), skip_column_import(), and validate_expected_column_count().

Referenced by foreign_storage::parse_file_regions(), and foreign_storage::scan_metadata().

276  {
277  CHECK(request.buffer);
279  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
280  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
281  const char* thread_buf_end = request.buffer.get() + request.end_pos;
282  const char* buf_end = request.buffer.get() + request.buffer_size;
283 
284  std::vector<std::string_view> row;
285  size_t row_index_plus_one = 0;
286  const char* p = thread_buf;
287  bool try_single_thread = false;
288  int phys_cols = 0;
289  int point_cols = 0;
290  std::unique_ptr<bool[]> array_flags;
291 
293  array_flags, phys_cols, point_cols, request.columns);
294  auto num_cols = request.columns.size() - phys_cols;
295 
296  size_t row_count = 0;
297  size_t remaining_row_count = request.process_row_count;
298  std::vector<size_t> row_offsets{};
299  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
300 
301  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
302  row.clear();
303  row_count++;
304  std::vector<std::unique_ptr<char[]>>
305  tmp_buffers; // holds string w/ removed escape chars, etc
306 
308  thread_buf_end,
309  buf_end,
310  request.copy_params,
311  array_flags.get(),
312  row,
313  tmp_buffers,
314  try_single_thread);
315 
316  row_index_plus_one++;
317  validate_expected_column_count(row, num_cols, point_cols);
318 
319  size_t import_idx = 0;
320  size_t col_idx = 0;
321  try {
322  for (auto cd_it = request.columns.begin(); cd_it != request.columns.end();
323  cd_it++) {
324  auto cd = *cd_it;
325  const auto& col_ti = cd->columnType;
326  bool is_null = is_null_datum(row[import_idx], cd, request.copy_params.null_str);
327 
328  if (col_ti.is_geometry()) {
329  if (!skip_column_import(request, col_idx)) {
330  process_geo_column(request.import_buffers,
331  col_idx,
332  request.copy_params,
333  cd_it,
334  row,
335  import_idx,
336  is_null,
337  request.first_row_index,
338  row_index_plus_one,
339  request.catalog);
340  } else {
341  // update import/col idx according to types
342  if (!is_null && cd->columnType == kPOINT &&
343  is_coordinate_scalar(row[import_idx])) {
344  ++import_idx;
345  }
346  ++import_idx;
347  ++col_idx;
348  col_idx += col_ti.get_physical_cols();
349  }
350  // skip remaining physical columns
351  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
352  ++cd_it;
353  }
354  } else {
355  if (!skip_column_import(request, col_idx)) {
356  request.import_buffers[col_idx]->add_value(
357  cd, row[import_idx], is_null, request.copy_params);
358  }
359  ++import_idx;
360  ++col_idx;
361  }
362  }
363  } catch (const std::exception& e) {
364  // TODO: Appropriate error handling for FSI
365  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
366  request.import_buffers[col_idx_to_pop]->pop_value();
367  }
368  LOG(ERROR) << "Input exception thrown: " << e.what()
369  << ". Row discarded. Data: " << shared::printContainer(row);
370  }
371  }
372  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
373 
374  ParseBufferResult result{};
375  result.row_offsets = row_offsets;
376  result.row_count = row_count;
377  result.column_id_to_data_blocks_map =
378  convert_import_buffers_to_data_blocks(request.import_buffers);
379  return result;
380 }
bool is_coordinate_scalar(const std::string_view datum)
#define LOG(tag)
Definition: Logger.h:188
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor *> &columns)
std::map< int, DataBlockPtr > convert_import_buffers_to_data_blocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
bool is_null_datum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
bool is_null(const T &v, const SQLTypeInfo &t)
#define CHECK(condition)
Definition: Logger.h:197
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
void process_geo_column(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor *>::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ process_geo_column()

void foreign_storage::csv_file_buffer_parser::process_geo_column ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
std::list< const ColumnDescriptor *>::iterator &  cd_it,
std::vector< std::string_view > &  row,
size_t &  import_idx,
bool  is_null,
size_t  first_row_index,
size_t  row_index_plus_one,
std::shared_ptr< Catalog_Namespace::Catalog catalog 
)

Definition at line 101 of file CsvFileBufferParser.h.

References CHECK, Geospatial::GeoTypesFactory::getGeoColumns(), Geospatial::GeoTypesFactory::getNullGeoColumns(), is_coordinate_scalar(), IS_GEO, kMULTIPOLYGON, kPOINT, kPOLYGON, import_export::CopyParams::lonlat, import_export::CopyParams::null_str, set_coordinates_from_separate_lon_lat_columns(), import_export::Importer::set_geo_physical_import_buffer(), and to_string().

Referenced by parse_buffer().

111  {
112  auto cd = *cd_it;
113  auto col_ti = cd->columnType;
114  SQLTypes col_type = col_ti.get_type();
115  CHECK(IS_GEO(col_type));
116 
117  // store null string in the base column
118  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
119 
120  auto const& geo_string = row[import_idx];
121  ++import_idx;
122  ++col_idx;
123 
124  std::vector<double> coords;
125  std::vector<double> bounds;
126  std::vector<int> ring_sizes;
127  std::vector<int> poly_rings;
128  int render_group = 0;
129 
130  if (!is_null && col_type == kPOINT && is_coordinate_scalar(geo_string)) {
132  geo_string, row[import_idx], coords, copy_params.lonlat)) {
133  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
134  cd->columnName);
135  }
136  ++import_idx;
137  } else {
138  SQLTypeInfo import_ti{col_ti};
139  if (is_null) {
141  coords,
142  bounds,
143  ring_sizes,
144  poly_rings,
146  } else {
147  // extract geometry directly from WKT
148  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
149  import_ti,
150  coords,
151  bounds,
152  ring_sizes,
153  poly_rings,
155  std::string msg = "Failed to extract valid geometry from row " +
156  std::to_string(first_row_index + row_index_plus_one) +
157  " for column " + cd->columnName;
158  throw std::runtime_error(msg);
159  }
160 
161  // validate types
162  if (col_type != import_ti.get_type()) {
164  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
165  col_type == SQLTypes::kMULTIPOLYGON)) {
166  throw std::runtime_error("Imported geometry doesn't match the type of column " +
167  cd->columnName);
168  }
169  }
170  }
171  }
172 
173  // import extracted geo
175  cd,
176  import_buffers,
177  col_idx,
178  coords,
179  bounds,
180  ring_sizes,
181  poly_rings,
182  render_group);
183 }
SQLTypes
Definition: sqltypes.h:40
bool is_coordinate_scalar(const std::string_view datum)
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:908
std::string to_string(char const *&&v)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1421
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, std::vector< double > &coords, const bool is_lon_lat_order)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:701
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
bool is_null(const T &v, const SQLTypeInfo &t)
#define CHECK(condition)
Definition: Logger.h:197
#define IS_GEO(T)
Definition: sqltypes.h:174
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_array_flags_and_geo_columns_count()

void foreign_storage::csv_file_buffer_parser::set_array_flags_and_geo_columns_count ( std::unique_ptr< bool[]> &  array_flags,
int &  phys_cols,
int &  point_cols,
const std::list< const ColumnDescriptor *> &  columns 
)

Definition at line 11 of file CsvFileBufferParser.h.

References kARRAY, and kPOINT.

Referenced by parse_buffer().

15  {
16  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
17  size_t i = 0;
18  for (const auto cd : columns) {
19  const auto& col_ti = cd->columnType;
20  phys_cols += col_ti.get_physical_cols();
21  if (cd->columnType.get_type() == kPOINT) {
22  point_cols++;
23  }
24 
25  if (cd->columnType.get_type() == kARRAY) {
26  array_flags.get()[i] = true;
27  } else {
28  array_flags.get()[i] = false;
29  }
30  i++;
31  }
32 }
+ Here is the caller graph for this function:

◆ set_coordinates_from_separate_lon_lat_columns()

bool foreign_storage::csv_file_buffer_parser::set_coordinates_from_separate_lon_lat_columns ( const std::string_view  lon_str,
const std::string_view  lat_str,
std::vector< double > &  coords,
const bool  is_lon_lat_order 
)

Definition at line 54 of file CsvFileBufferParser.h.

References is_coordinate_scalar().

Referenced by process_geo_column().

57  {
58  double lon = std::atof(std::string(lon_str).c_str());
59  double lat = NAN;
60 
61  if (is_coordinate_scalar(lat_str)) {
62  lat = std::atof(std::string(lat_str).c_str());
63  }
64 
65  // Swap coordinates if this table uses a reverse order: lat/lon
66  if (!is_lon_lat_order) {
67  std::swap(lat, lon);
68  }
69 
70  // TODO: should check if POINT column should have been declared with
71  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
72  // throw std::runtime_error("POINT column " + cd->columnName + " is
73  // not WGS84, cannot insert lon/lat");
74  // }
75 
76  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
77  return false;
78  }
79  coords.push_back(lon);
80  coords.push_back(lat);
81  return true;
82 }
bool is_coordinate_scalar(const std::string_view datum)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ skip_column_import()

bool foreign_storage::csv_file_buffer_parser::skip_column_import ( ParseBufferRequest request,
int  column_idx 
)

Definition at line 268 of file CsvFileBufferParser.h.

References foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers.

Referenced by parse_buffer().

268  {
269  return request.import_buffers[column_idx] == nullptr;
270 }
+ Here is the caller graph for this function:

◆ validate_expected_column_count()

void foreign_storage::csv_file_buffer_parser::validate_expected_column_count ( std::vector< std::string_view > &  row,
size_t  num_cols,
int  point_cols 
)

Definition at line 34 of file CsvFileBufferParser.h.

References logger::ERROR, LOG, and shared::printContainer().

Referenced by parse_buffer().

36  {
37  // Each POINT could consume two separate coords instead of a single WKT
38  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
39  std::stringstream string_stream;
40  string_stream << "Mismatched number of logical columns: (expected " << num_cols
41  << " columns, has " << row.size()
42  << "): " << shared::printContainer(row);
43  LOG(ERROR) << string_stream.str();
44  throw std::runtime_error{string_stream.str()};
45  }
46 }
#define LOG(tag)
Definition: Logger.h:188
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Variable Documentation

◆ PROMOTE_POLYGON_TO_MULTIPOLYGON

constexpr bool foreign_storage::csv_file_buffer_parser::PROMOTE_POLYGON_TO_MULTIPOLYGON = true
static

Definition at line 9 of file CsvFileBufferParser.h.