OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CsvFileBufferParser Class Reference

#include <CsvFileBufferParser.h>

+ Inheritance diagram for foreign_storage::CsvFileBufferParser:
+ Collaboration diagram for foreign_storage::CsvFileBufferParser:

Public Member Functions

ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
 
void validateExpectedColumnCount (const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Attributes

static const std::string DELIMITER_KEY = "DELIMITER"
 
static const std::string NULLS_KEY = "NULLS"
 
static const std::string HEADER_KEY = "HEADER"
 
static const std::string QUOTED_KEY = "QUOTED"
 
static const std::string QUOTE_KEY = "QUOTE"
 
static const std::string ESCAPE_KEY = "ESCAPE"
 
static const std::string LINE_DELIMITER_KEY = "LINE_DELIMITER"
 
static const std::string ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
 
static const std::string ARRAY_MARKER_KEY = "ARRAY_MARKER"
 
static const std::string LONLAT_KEY = "LONLAT"
 
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
 
static const std::string GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
 
static const std::string SOURCE_SRID_KEY = "SOURCE_SRID"
 
static const std::string TRIM_SPACES_KEY = "TRIM_SPACES"
 
- Static Public Attributes inherited from foreign_storage::TextFileBufferParser
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Additional Inherited Members

- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Detailed Description

Definition at line 22 of file CsvFileBufferParser.h.

Member Function Documentation

size_t foreign_storage::CsvFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
foreign_storage::FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 411 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::find_row_end_pos().

418  {
420  buffer,
421  buffer_size,
422  copy_params,
423  buffer_first_row_index,
424  num_rows_in_buffer,
425  nullptr,
426  file_reader);
427 }
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...

+ Here is the call graph for this function:

ParseBufferResult foreign_storage::CsvFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false,
bool  skip_dict_encoding = false 
) const
overridevirtual

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 122 of file CsvFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, foreign_storage::ParseBufferRequest::buffer_size, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData(), import_export::delimited_parser::find_beginning(), foreign_storage::ParseBufferRequest::first_row_index, foreign_storage::ParseBufferRequest::foreign_table_schema, import_export::delimited_parser::get_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isCoordinateScalar(), foreign_storage::TextFileBufferParser::isNullDatum(), kPOINT, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), foreign_storage::ParseBufferRequest::render_group_analyzer_map, run_benchmark_import::result, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::set_array_flags_and_geo_columns_count(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::skip_column_import(), sv_strip(), foreign_storage::ParseBufferRequest::track_rejected_rows, import_export::CopyParams::trim_spaces, and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

125  {
126  CHECK(request.buffer);
128  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
129  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
130  const char* thread_buf_end = request.buffer.get() + request.end_pos;
131  const char* buf_end = request.buffer.get() + request.buffer_size;
132 
133  std::vector<std::string_view> row;
134  size_t row_index_plus_one = 0;
135  const char* p = thread_buf;
136  bool try_single_thread = false;
137  int phys_cols = 0;
138  int point_cols = 0;
139  std::unique_ptr<bool[]> array_flags;
140 
142  array_flags,
143  phys_cols,
144  point_cols,
145  request.foreign_table_schema->getLogicalColumns());
146  auto num_cols = request.getColumns().size() - phys_cols;
147  if (columns_are_pre_filtered) {
148  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
149  if (skip_column_import(request, col_idx)) {
150  --num_cols;
151  }
152  }
153  }
154 
155  size_t current_row_id = 0;
156  size_t row_count = 0;
157  size_t remaining_row_count = request.process_row_count;
158  std::vector<size_t> row_offsets{};
159  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
160 
161  ParseBufferResult result{};
162 
163  std::string file_path = request.getFilePath();
164  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
165  row.clear();
166  current_row_id = row_count;
167  row_count++;
168  std::vector<std::unique_ptr<char[]>>
169  tmp_buffers; // holds string w/ removed escape chars, etc
170  const char* line_start = p;
172  thread_buf_end,
173  buf_end,
174  request.copy_params,
175  array_flags.get(),
176  row,
177  tmp_buffers,
178  try_single_thread,
179  !columns_are_pre_filtered);
180 
181  row_index_plus_one++;
182 
183  bool incorrect_column_count = false;
184  try {
185  validate_expected_column_count(row, num_cols, point_cols, file_path);
186  } catch (const ForeignStorageException& e) {
187  if (request.track_rejected_rows) {
188  result.rejected_rows.insert(current_row_id);
189  incorrect_column_count = true;
190  } else {
191  throw;
192  }
193  }
194 
195  size_t import_idx = 0;
196  size_t col_idx = 0;
197 
198  try {
199  auto columns = request.getColumns();
200  if (incorrect_column_count) {
201  auto cd_it = columns.begin();
202  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
203  continue;
204  }
205  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
206  auto cd = *cd_it;
207  const auto& col_ti = cd->columnType;
208  bool column_is_present =
209  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
210  CHECK(row.size() > import_idx || !column_is_present);
211  bool is_null = false;
212  try {
213  is_null = column_is_present
214  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
215  : true;
216  } catch (const std::exception& e) {
217  if (request.track_rejected_rows) {
218  result.rejected_rows.insert(current_row_id);
219  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
220  break; // skip rest of row
221  } else {
222  throw;
223  }
224  }
225  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
226  // everything but strings should be always trimmed
227  row[import_idx] = sv_strip(row[import_idx]);
228  }
229  if (col_ti.is_geometry()) {
230  if (!skip_column_import(request, col_idx)) {
231  auto starting_col_idx = col_idx;
232  try {
233  processGeoColumn(request.import_buffers,
234  col_idx,
235  request.copy_params,
236  cd_it,
237  row,
238  import_idx,
239  is_null,
240  request.first_row_index,
241  row_index_plus_one,
242  request.getCatalog(),
243  request.render_group_analyzer_map);
244  } catch (const std::exception& e) {
245  if (request.track_rejected_rows) {
246  result.rejected_rows.insert(current_row_id);
247  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
248  break; // skip rest of row
249  } else {
250  throw;
251  }
252  }
253  } else {
254  // update import/col idx according to types
255  if (!is_null && cd->columnType == kPOINT &&
256  isCoordinateScalar(row[import_idx])) {
257  if (!columns_are_pre_filtered) {
258  ++import_idx;
259  }
260  }
261  if (!columns_are_pre_filtered) {
262  ++import_idx;
263  }
264  ++col_idx;
265  col_idx += col_ti.get_physical_cols();
266  }
267  // skip remaining physical columns
268  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
269  ++cd_it;
270  }
271  } else {
272  if (!skip_column_import(request, col_idx)) {
273  try {
274  request.import_buffers[col_idx]->add_value(
275  cd, row[import_idx], is_null, request.copy_params);
276  } catch (const std::exception& e) {
277  if (request.track_rejected_rows) {
278  result.rejected_rows.insert(current_row_id);
279  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
280  break; // skip rest of row
281  } else {
282  throw;
283  }
284  }
285  }
286  if (column_is_present) {
287  ++import_idx;
288  }
289  ++col_idx;
290  }
291  }
292  } catch (const std::exception& e) {
293  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
294  "\" in row \"" + std::string(line_start, p) +
295  "\" in file \"" + file_path + "\"");
296  }
297  }
298  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
299 
300  result.row_offsets = row_offsets;
301  result.row_count = row_count;
302  if (convert_data_blocks) {
303  result.column_id_to_data_blocks_map =
304  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
305  }
306  return result;
307 }
std::string_view sv_strip(std::string_view str)
return trimmed string_view
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
CONSTEXPR DEVICE bool is_null(const T &value)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
bool g_enable_smem_group_by true
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:291
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::CsvFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 337 of file CsvFileBufferParser.cpp.

References ARRAY_DELIMITER_KEY, ARRAY_MARKER_KEY, foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY, DELIMITER_KEY, ESCAPE_KEY, GEO_ASSIGN_RENDER_GROUPS_KEY, GEO_EXPLODE_COLLECTIONS_KEY, HEADER_KEY, import_export::kHasHeader, import_export::kNoHeader, LINE_DELIMITER_KEY, LONLAT_KEY, NULLS_KEY, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, QUOTE_KEY, QUOTED_KEY, SOURCE_SRID_KEY, foreign_storage::AbstractFileStorageDataWrapper::THREADS_KEY, TRIM_SPACES_KEY, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_delimiter(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_string_with_length().

Referenced by foreign_storage::CsvDataWrapper::validateTableOptions().

338  {
339  import_export::CopyParams copy_params{};
340  copy_params.plain_text = true;
341  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
342  !value.empty()) {
343  copy_params.delimiter = value[0];
344  }
345  if (auto it = foreign_table->options.find(NULLS_KEY);
346  it != foreign_table->options.end()) {
347  copy_params.null_str = it->second;
348  }
349  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
350  if (has_header.has_value()) {
351  if (has_header.value()) {
352  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
353  } else {
354  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
355  }
356  }
357  copy_params.quoted =
358  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
359  if (const auto& value =
361  !value.empty()) {
362  copy_params.quote = value[0];
363  }
364  if (const auto& value =
366  !value.empty()) {
367  copy_params.escape = value[0];
368  }
369  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
370  !value.empty()) {
371  copy_params.line_delim = value[0];
372  }
373  if (const auto& value =
375  !value.empty()) {
376  copy_params.array_delim = value[0];
377  }
378  if (const auto& value =
380  !value.empty()) {
381  copy_params.array_begin = value[0];
382  copy_params.array_end = value[1];
383  }
384  copy_params.lonlat =
385  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
386  copy_params.geo_assign_render_groups =
388  .value_or(copy_params.geo_assign_render_groups);
389  copy_params.geo_explode_collections =
391  .value_or(copy_params.geo_explode_collections);
392  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
393  it != foreign_table->options.end()) {
394  copy_params.source_srid = std::stoi(it->second);
395  }
396 
397  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
398  it != foreign_table->options.end()) {
399  copy_params.buffer_size = std::stoi(it->second);
400  }
401  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
402  it != foreign_table->options.end()) {
403  copy_params.threads = std::stoi(it->second);
404  }
405  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
406  .value_or(copy_params.trim_spaces);
407 
408  return copy_params;
409 }
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string LINE_DELIMITER_KEY
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
static const std::string ARRAY_DELIMITER_KEY
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvFileBufferParser::validateExpectedColumnCount ( const std::string &  row,
const import_export::CopyParams copy_params,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
) const

Takes a single row and verifies number of columns is valid for num_cols and point_cols (number of point columns)

Definition at line 312 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::get_row(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

317  {
318  bool is_array = false;
319  bool try_single_thread = false;
320  std::vector<std::unique_ptr<char[]>> tmp_buffers;
321  std::vector<std::string_view> fields;
322  // parse columns in row into fields (other return values are intentionally ignored)
324  row.c_str() + row.size(),
325  row.c_str() + row.size(),
326  copy_params,
327  &is_array,
328  fields,
329  tmp_buffers,
330  try_single_thread,
331  false // Don't filter empty lines
332  );
333  // Check we have right number of columns
334  validate_expected_column_count(fields, num_cols, point_cols, file_name);
335 }
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)

+ Here is the call graph for this function:

void foreign_storage::CsvFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
inlineoverridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 46 of file CsvFileBufferParser.h.

47  {};

Member Data Documentation

const std::string foreign_storage::CsvFileBufferParser::ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ARRAY_MARKER_KEY = "ARRAY_MARKER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::DELIMITER_KEY = "DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ESCAPE_KEY = "ESCAPE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY
inlinestatic
Initial value:
=
"GEO_ASSIGN_RENDER_GROUPS"

Definition at line 60 of file CsvFileBufferParser.h.

Referenced by foreign_storage::ForeignDataWrapperFactory::createForeignTableProxy(), and validateAndGetCopyParams().

const std::string foreign_storage::CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::HEADER_KEY = "HEADER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LINE_DELIMITER_KEY = "LINE_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LONLAT_KEY = "LONLAT"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::NULLS_KEY = "NULLS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTE_KEY = "QUOTE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTED_KEY = "QUOTED"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::SOURCE_SRID_KEY = "SOURCE_SRID"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::TRIM_SPACES_KEY = "TRIM_SPACES"
inlinestatic

The documentation for this class was generated from the following files: