OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CsvFileBufferParser Class Reference

#include <CsvFileBufferParser.h>

+ Inheritance diagram for foreign_storage::CsvFileBufferParser:
+ Collaboration diagram for foreign_storage::CsvFileBufferParser:

Public Member Functions

ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
 
void validateExpectedColumnCount (const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Attributes

static const std::string DELIMITER_KEY = "DELIMITER"
 
static const std::string NULLS_KEY = "NULLS"
 
static const std::string HEADER_KEY = "HEADER"
 
static const std::string QUOTED_KEY = "QUOTED"
 
static const std::string QUOTE_KEY = "QUOTE"
 
static const std::string ESCAPE_KEY = "ESCAPE"
 
static const std::string LINE_DELIMITER_KEY = "LINE_DELIMITER"
 
static const std::string ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
 
static const std::string ARRAY_MARKER_KEY = "ARRAY_MARKER"
 
static const std::string LONLAT_KEY = "LONLAT"
 
static const std::string GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
 
static const std::string SOURCE_SRID_KEY = "SOURCE_SRID"
 
static const std::string TRIM_SPACES_KEY = "TRIM_SPACES"
 
- Static Public Attributes inherited from foreign_storage::TextFileBufferParser
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Additional Inherited Members

- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Detailed Description

Definition at line 22 of file CsvFileBufferParser.h.

Member Function Documentation

size_t foreign_storage::CsvFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
foreign_storage::FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 408 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::find_row_end_pos().

415  {
417  buffer,
418  buffer_size,
419  copy_params,
420  buffer_first_row_index,
421  num_rows_in_buffer,
422  nullptr,
423  file_reader);
424 }
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...

+ Here is the call graph for this function:

ParseBufferResult foreign_storage::CsvFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false,
bool  skip_dict_encoding = false 
) const
overridevirtual

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 122 of file CsvFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, foreign_storage::ParseBufferRequest::buffer_size, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData(), import_export::delimited_parser::find_beginning(), foreign_storage::ParseBufferRequest::first_row_index, foreign_storage::ParseBufferRequest::foreign_table_schema, import_export::delimited_parser::get_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isCoordinateScalar(), foreign_storage::TextFileBufferParser::isNullDatum(), kPOINT, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), run_benchmark_import::result, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::set_array_flags_and_geo_columns_count(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::skip_column_import(), sv_strip(), foreign_storage::ParseBufferRequest::track_rejected_rows, import_export::CopyParams::trim_spaces, and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

125  {
126  CHECK(request.buffer);
128  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
129  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
130  const char* thread_buf_end = request.buffer.get() + request.end_pos;
131  const char* buf_end = request.buffer.get() + request.buffer_size;
132 
133  std::vector<std::string_view> row;
134  size_t row_index_plus_one = 0;
135  const char* p = thread_buf;
136  bool try_single_thread = false;
137  int phys_cols = 0;
138  int point_cols = 0;
139  std::unique_ptr<bool[]> array_flags;
140 
142  array_flags,
143  phys_cols,
144  point_cols,
145  request.foreign_table_schema->getLogicalColumns());
146  auto num_cols = request.getColumns().size() - phys_cols;
147  if (columns_are_pre_filtered) {
148  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
149  if (skip_column_import(request, col_idx)) {
150  --num_cols;
151  }
152  }
153  }
154 
155  size_t current_row_id = 0;
156  size_t row_count = 0;
157  size_t remaining_row_count = request.process_row_count;
158  std::vector<size_t> row_offsets{};
159  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
160 
161  ParseBufferResult result{};
162 
163  std::string file_path = request.getFilePath();
164  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
165  row.clear();
166  current_row_id = row_count;
167  row_count++;
168  std::vector<std::unique_ptr<char[]>>
169  tmp_buffers; // holds string w/ removed escape chars, etc
170  const char* line_start = p;
171  row_index_plus_one++;
172  bool incorrect_column_count = false;
174  thread_buf_end,
175  buf_end,
176  request.copy_params,
177  array_flags.get(),
178  row,
179  tmp_buffers,
180  try_single_thread,
181  !columns_are_pre_filtered);
182  try {
183  validate_expected_column_count(row, num_cols, point_cols, file_path);
184  } catch (const ForeignStorageException& e) {
185  if (request.track_rejected_rows) {
186  result.rejected_rows.insert(current_row_id);
187  incorrect_column_count = true;
188  } else {
189  throw;
190  }
191  }
192 
193  size_t import_idx = 0;
194  size_t col_idx = 0;
195 
196  try {
197  auto columns = request.getColumns();
198  if (incorrect_column_count) {
199  auto cd_it = columns.begin();
200  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
201  continue;
202  }
203  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
204  auto cd = *cd_it;
205  const auto& col_ti = cd->columnType;
206  bool column_is_present =
207  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
208  CHECK(row.size() > import_idx || !column_is_present);
209  bool is_null = false;
210  try {
211  is_null = column_is_present
212  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
213  : true;
214  } catch (const std::exception& e) {
215  if (request.track_rejected_rows) {
216  result.rejected_rows.insert(current_row_id);
217  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
218  break; // skip rest of row
219  } else {
220  throw;
221  }
222  }
223  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
224  // everything but strings should be always trimmed
225  row[import_idx] = sv_strip(row[import_idx]);
226  }
227  if (col_ti.is_geometry()) {
228  if (!skip_column_import(request, col_idx)) {
229  auto starting_col_idx = col_idx;
230  try {
231  processGeoColumn(request.import_buffers,
232  col_idx,
233  request.copy_params,
234  cd_it,
235  row,
236  import_idx,
237  is_null,
238  request.first_row_index,
239  row_index_plus_one,
240  request.getCatalog());
241  } catch (const std::exception& e) {
242  if (request.track_rejected_rows) {
243  result.rejected_rows.insert(current_row_id);
244  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
245  break; // skip rest of row
246  } else {
247  throw;
248  }
249  }
250  } else {
251  // update import/col idx according to types
252  if (!is_null && cd->columnType == kPOINT &&
253  isCoordinateScalar(row[import_idx])) {
254  if (!columns_are_pre_filtered) {
255  ++import_idx;
256  }
257  }
258  if (!columns_are_pre_filtered) {
259  ++import_idx;
260  }
261  ++col_idx;
262  col_idx += col_ti.get_physical_cols();
263  }
264  // skip remaining physical columns
265  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
266  ++cd_it;
267  }
268  } else {
269  if (!skip_column_import(request, col_idx)) {
270  try {
271  request.import_buffers[col_idx]->add_value(
272  cd, row[import_idx], is_null, request.copy_params);
273  } catch (const std::exception& e) {
274  if (request.track_rejected_rows) {
275  result.rejected_rows.insert(current_row_id);
276  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
277  break; // skip rest of row
278  } else {
279  throw;
280  }
281  }
282  }
283  if (column_is_present) {
284  ++import_idx;
285  }
286  ++col_idx;
287  }
288  }
289  } catch (const std::exception& e) {
290  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
291  "\" in row \"" + std::string(line_start, p) +
292  "\" in file \"" + file_path + "\"");
293  }
294  }
295  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
296 
297  result.row_offsets = row_offsets;
298  result.row_count = row_count;
299  if (convert_data_blocks) {
300  result.column_id_to_data_blocks_map =
301  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
302  }
303  return result;
304 }
std::string_view sv_strip(std::string_view str)
return trimmed string_view
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
CONSTEXPR DEVICE bool is_null(const T &value)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
bool g_enable_smem_group_by true
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:291
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::CsvFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 334 of file CsvFileBufferParser.cpp.

References ARRAY_DELIMITER_KEY, ARRAY_MARKER_KEY, foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY, DELIMITER_KEY, ESCAPE_KEY, GEO_EXPLODE_COLLECTIONS_KEY, foreign_storage::ForeignTable::GEO_VALIDATE_GEOMETRY_KEY, HEADER_KEY, import_export::kHasHeader, import_export::kNoHeader, LINE_DELIMITER_KEY, LONLAT_KEY, NULLS_KEY, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, QUOTE_KEY, QUOTED_KEY, SOURCE_SRID_KEY, foreign_storage::AbstractFileStorageDataWrapper::THREADS_KEY, TRIM_SPACES_KEY, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_delimiter(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_string_with_length().

Referenced by foreign_storage::CsvDataWrapper::validateTableOptions().

335  {
336  import_export::CopyParams copy_params{};
337  copy_params.plain_text = true;
338  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
339  !value.empty()) {
340  copy_params.delimiter = value[0];
341  }
342  if (auto it = foreign_table->options.find(NULLS_KEY);
343  it != foreign_table->options.end()) {
344  copy_params.null_str = it->second;
345  }
346  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
347  if (has_header.has_value()) {
348  if (has_header.value()) {
349  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
350  } else {
351  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
352  }
353  }
354  copy_params.quoted =
355  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
356  if (const auto& value =
358  !value.empty()) {
359  copy_params.quote = value[0];
360  }
361  if (const auto& value =
363  !value.empty()) {
364  copy_params.escape = value[0];
365  }
366  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
367  !value.empty()) {
368  copy_params.line_delim = value[0];
369  }
370  if (const auto& value =
372  !value.empty()) {
373  copy_params.array_delim = value[0];
374  }
375  if (const auto& value =
377  !value.empty()) {
378  copy_params.array_begin = value[0];
379  copy_params.array_end = value[1];
380  }
381  copy_params.lonlat =
382  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
383  copy_params.geo_explode_collections =
385  .value_or(copy_params.geo_explode_collections);
386  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
387  it != foreign_table->options.end()) {
388  copy_params.source_srid = std::stoi(it->second);
389  }
390 
391  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
392  it != foreign_table->options.end()) {
393  copy_params.buffer_size = std::stoi(it->second);
394  }
395  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
396  it != foreign_table->options.end()) {
397  copy_params.threads = std::stoi(it->second);
398  }
399  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
400  .value_or(copy_params.trim_spaces);
401  copy_params.geo_validate_geometry =
403  .value_or(copy_params.geo_validate_geometry);
404 
405  return copy_params;
406 }
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string LINE_DELIMITER_KEY
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
static const std::string ARRAY_DELIMITER_KEY

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvFileBufferParser::validateExpectedColumnCount ( const std::string &  row,
const import_export::CopyParams copy_params,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
) const

Takes a single row and verifies number of columns is valid for num_cols and point_cols (number of point columns)

Definition at line 309 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::get_row(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

314  {
315  bool is_array = false;
316  bool try_single_thread = false;
317  std::vector<std::unique_ptr<char[]>> tmp_buffers;
318  std::vector<std::string_view> fields;
319  // parse columns in row into fields (other return values are intentionally ignored)
321  row.c_str() + row.size(),
322  row.c_str() + row.size(),
323  copy_params,
324  &is_array,
325  fields,
326  tmp_buffers,
327  try_single_thread,
328  false // Don't filter empty lines
329  );
330  // Check we have right number of columns
331  validate_expected_column_count(fields, num_cols, point_cols, file_name);
332 }
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)

+ Here is the call graph for this function:

void foreign_storage::CsvFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
inlineoverridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 46 of file CsvFileBufferParser.h.

47  {};

Member Data Documentation

const std::string foreign_storage::CsvFileBufferParser::ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ARRAY_MARKER_KEY = "ARRAY_MARKER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::DELIMITER_KEY = "DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ESCAPE_KEY = "ESCAPE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::HEADER_KEY = "HEADER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LINE_DELIMITER_KEY = "LINE_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LONLAT_KEY = "LONLAT"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::NULLS_KEY = "NULLS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTE_KEY = "QUOTE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTED_KEY = "QUOTED"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::SOURCE_SRID_KEY = "SOURCE_SRID"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::TRIM_SPACES_KEY = "TRIM_SPACES"
inlinestatic

The documentation for this class was generated from the following files: