OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CsvFileBufferParser Class Reference

#include <CsvFileBufferParser.h>

+ Inheritance diagram for foreign_storage::CsvFileBufferParser:
+ Collaboration diagram for foreign_storage::CsvFileBufferParser:

Public Member Functions

ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
 
void validateExpectedColumnCount (const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Attributes

static const std::string DELIMITER_KEY = "DELIMITER"
 
static const std::string NULLS_KEY = "NULLS"
 
static const std::string HEADER_KEY = "HEADER"
 
static const std::string QUOTED_KEY = "QUOTED"
 
static const std::string QUOTE_KEY = "QUOTE"
 
static const std::string ESCAPE_KEY = "ESCAPE"
 
static const std::string LINE_DELIMITER_KEY = "LINE_DELIMITER"
 
static const std::string ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
 
static const std::string ARRAY_MARKER_KEY = "ARRAY_MARKER"
 
static const std::string LONLAT_KEY = "LONLAT"
 
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
 
static const std::string GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
 
static const std::string SOURCE_SRID_KEY = "SOURCE_SRID"
 
static const std::string TRIM_SPACES_KEY = "TRIM_SPACES"
 
- Static Public Attributes inherited from foreign_storage::TextFileBufferParser
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Additional Inherited Members

- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Detailed Description

Definition at line 22 of file CsvFileBufferParser.h.

Member Function Documentation

size_t foreign_storage::CsvFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
foreign_storage::FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 410 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::find_row_end_pos().

417  {
419  buffer,
420  buffer_size,
421  copy_params,
422  buffer_first_row_index,
423  num_rows_in_buffer,
424  nullptr,
425  file_reader);
426 }
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...

+ Here is the call graph for this function:

ParseBufferResult foreign_storage::CsvFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
overridevirtual

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 122 of file CsvFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, foreign_storage::ParseBufferRequest::buffer_size, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData(), import_export::delimited_parser::find_beginning(), foreign_storage::ParseBufferRequest::first_row_index, foreign_storage::ParseBufferRequest::foreign_table_schema, import_export::delimited_parser::get_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isCoordinateScalar(), foreign_storage::TextFileBufferParser::isNullDatum(), kPOINT, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), foreign_storage::ParseBufferRequest::render_group_analyzer_map, run_benchmark_import::result, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::set_array_flags_and_geo_columns_count(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::skip_column_import(), sv_strip(), foreign_storage::ParseBufferRequest::track_rejected_rows, import_export::CopyParams::trim_spaces, and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

124  {
125  CHECK(request.buffer);
127  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
128  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
129  const char* thread_buf_end = request.buffer.get() + request.end_pos;
130  const char* buf_end = request.buffer.get() + request.buffer_size;
131 
132  std::vector<std::string_view> row;
133  size_t row_index_plus_one = 0;
134  const char* p = thread_buf;
135  bool try_single_thread = false;
136  int phys_cols = 0;
137  int point_cols = 0;
138  std::unique_ptr<bool[]> array_flags;
139 
141  array_flags,
142  phys_cols,
143  point_cols,
144  request.foreign_table_schema->getLogicalColumns());
145  auto num_cols = request.getColumns().size() - phys_cols;
146  if (columns_are_pre_filtered) {
147  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
148  if (skip_column_import(request, col_idx)) {
149  --num_cols;
150  }
151  }
152  }
153 
154  size_t current_row_id = 0;
155  size_t row_count = 0;
156  size_t remaining_row_count = request.process_row_count;
157  std::vector<size_t> row_offsets{};
158  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
159 
160  ParseBufferResult result{};
161 
162  std::string file_path = request.getFilePath();
163  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
164  row.clear();
165  current_row_id = row_count;
166  row_count++;
167  std::vector<std::unique_ptr<char[]>>
168  tmp_buffers; // holds string w/ removed escape chars, etc
169  const char* line_start = p;
171  thread_buf_end,
172  buf_end,
173  request.copy_params,
174  array_flags.get(),
175  row,
176  tmp_buffers,
177  try_single_thread,
178  !columns_are_pre_filtered);
179 
180  row_index_plus_one++;
181 
182  bool incorrect_column_count = false;
183  try {
184  validate_expected_column_count(row, num_cols, point_cols, file_path);
185  } catch (const ForeignStorageException& e) {
186  if (request.track_rejected_rows) {
187  result.rejected_rows.insert(current_row_id);
188  incorrect_column_count = true;
189  } else {
190  throw;
191  }
192  }
193 
194  size_t import_idx = 0;
195  size_t col_idx = 0;
196 
197  try {
198  auto columns = request.getColumns();
199  if (incorrect_column_count) {
200  auto cd_it = columns.begin();
201  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
202  continue;
203  }
204  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
205  auto cd = *cd_it;
206  const auto& col_ti = cd->columnType;
207  bool column_is_present =
208  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
209  CHECK(row.size() > import_idx || !column_is_present);
210  bool is_null = false;
211  try {
212  is_null = column_is_present
213  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
214  : true;
215  } catch (const std::exception& e) {
216  if (request.track_rejected_rows) {
217  result.rejected_rows.insert(current_row_id);
218  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
219  break; // skip rest of row
220  } else {
221  throw;
222  }
223  }
224  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
225  // everything but strings should be always trimmed
226  row[import_idx] = sv_strip(row[import_idx]);
227  }
228  if (col_ti.is_geometry()) {
229  if (!skip_column_import(request, col_idx)) {
230  auto starting_col_idx = col_idx;
231  try {
232  processGeoColumn(request.import_buffers,
233  col_idx,
234  request.copy_params,
235  cd_it,
236  row,
237  import_idx,
238  is_null,
239  request.first_row_index,
240  row_index_plus_one,
241  request.getCatalog(),
242  request.render_group_analyzer_map);
243  } catch (const std::exception& e) {
244  if (request.track_rejected_rows) {
245  result.rejected_rows.insert(current_row_id);
246  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
247  break; // skip rest of row
248  } else {
249  throw;
250  }
251  }
252  } else {
253  // update import/col idx according to types
254  if (!is_null && cd->columnType == kPOINT &&
255  isCoordinateScalar(row[import_idx])) {
256  if (!columns_are_pre_filtered) {
257  ++import_idx;
258  }
259  }
260  if (!columns_are_pre_filtered) {
261  ++import_idx;
262  }
263  ++col_idx;
264  col_idx += col_ti.get_physical_cols();
265  }
266  // skip remaining physical columns
267  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
268  ++cd_it;
269  }
270  } else {
271  if (!skip_column_import(request, col_idx)) {
272  try {
273  request.import_buffers[col_idx]->add_value(
274  cd, row[import_idx], is_null, request.copy_params);
275  } catch (const std::exception& e) {
276  if (request.track_rejected_rows) {
277  result.rejected_rows.insert(current_row_id);
278  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
279  break; // skip rest of row
280  } else {
281  throw;
282  }
283  }
284  }
285  if (column_is_present) {
286  ++import_idx;
287  }
288  ++col_idx;
289  }
290  }
291  } catch (const std::exception& e) {
292  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
293  "\" in row \"" + std::string(line_start, p) +
294  "\" in file \"" + file_path + "\"");
295  }
296  }
297  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
298 
299  result.row_offsets = row_offsets;
300  result.row_count = row_count;
301  if (convert_data_blocks) {
302  result.column_id_to_data_blocks_map =
303  convertImportBuffersToDataBlocks(request.import_buffers);
304  }
305  return result;
306 }
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
std::string_view sv_strip(std::string_view str)
return trimmed string_view
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
CONSTEXPR DEVICE bool is_null(const T &value)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
bool g_enable_smem_group_by true
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:222
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::CsvFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 336 of file CsvFileBufferParser.cpp.

References ARRAY_DELIMITER_KEY, ARRAY_MARKER_KEY, foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY, DELIMITER_KEY, ESCAPE_KEY, GEO_ASSIGN_RENDER_GROUPS_KEY, GEO_EXPLODE_COLLECTIONS_KEY, HEADER_KEY, import_export::kHasHeader, import_export::kNoHeader, LINE_DELIMITER_KEY, LONLAT_KEY, NULLS_KEY, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, QUOTE_KEY, QUOTED_KEY, SOURCE_SRID_KEY, foreign_storage::AbstractFileStorageDataWrapper::THREADS_KEY, TRIM_SPACES_KEY, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_delimiter(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_string_with_length().

Referenced by foreign_storage::CsvDataWrapper::validateTableOptions().

337  {
338  import_export::CopyParams copy_params{};
339  copy_params.plain_text = true;
340  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
341  !value.empty()) {
342  copy_params.delimiter = value[0];
343  }
344  if (auto it = foreign_table->options.find(NULLS_KEY);
345  it != foreign_table->options.end()) {
346  copy_params.null_str = it->second;
347  }
348  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
349  if (has_header.has_value()) {
350  if (has_header.value()) {
351  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
352  } else {
353  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
354  }
355  }
356  copy_params.quoted =
357  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
358  if (const auto& value =
360  !value.empty()) {
361  copy_params.quote = value[0];
362  }
363  if (const auto& value =
365  !value.empty()) {
366  copy_params.escape = value[0];
367  }
368  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
369  !value.empty()) {
370  copy_params.line_delim = value[0];
371  }
372  if (const auto& value =
374  !value.empty()) {
375  copy_params.array_delim = value[0];
376  }
377  if (const auto& value =
379  !value.empty()) {
380  copy_params.array_begin = value[0];
381  copy_params.array_end = value[1];
382  }
383  copy_params.lonlat =
384  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
385  copy_params.geo_assign_render_groups =
387  .value_or(copy_params.geo_assign_render_groups);
388  copy_params.geo_explode_collections =
390  .value_or(copy_params.geo_explode_collections);
391  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
392  it != foreign_table->options.end()) {
393  copy_params.source_srid = std::stoi(it->second);
394  }
395 
396  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
397  it != foreign_table->options.end()) {
398  copy_params.buffer_size = std::stoi(it->second);
399  }
400  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
401  it != foreign_table->options.end()) {
402  copy_params.threads = std::stoi(it->second);
403  }
404  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
405  .value_or(copy_params.trim_spaces);
406 
407  return copy_params;
408 }
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string LINE_DELIMITER_KEY
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
static const std::string ARRAY_DELIMITER_KEY
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvFileBufferParser::validateExpectedColumnCount ( const std::string &  row,
const import_export::CopyParams copy_params,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
) const

Takes a single row and verifies number of columns is valid for num_cols and point_cols (number of point columns)

Definition at line 311 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::get_row(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

316  {
317  bool is_array = false;
318  bool try_single_thread = false;
319  std::vector<std::unique_ptr<char[]>> tmp_buffers;
320  std::vector<std::string_view> fields;
321  // parse columns in row into fields (other return values are intentionally ignored)
323  row.c_str() + row.size(),
324  row.c_str() + row.size(),
325  copy_params,
326  &is_array,
327  fields,
328  tmp_buffers,
329  try_single_thread,
330  false // Don't filter empty lines
331  );
332  // Check we have right number of columns
333  validate_expected_column_count(fields, num_cols, point_cols, file_name);
334 }
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)

+ Here is the call graph for this function:

void foreign_storage::CsvFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
inlineoverridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 45 of file CsvFileBufferParser.h.

46  {};

Member Data Documentation

const std::string foreign_storage::CsvFileBufferParser::ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ARRAY_MARKER_KEY = "ARRAY_MARKER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::DELIMITER_KEY = "DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ESCAPE_KEY = "ESCAPE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY
inlinestatic
Initial value:
=
"GEO_ASSIGN_RENDER_GROUPS"

Definition at line 59 of file CsvFileBufferParser.h.

Referenced by foreign_storage::ForeignDataWrapperFactory::createForeignTableProxy(), and validateAndGetCopyParams().

const std::string foreign_storage::CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::HEADER_KEY = "HEADER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LINE_DELIMITER_KEY = "LINE_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LONLAT_KEY = "LONLAT"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::NULLS_KEY = "NULLS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTE_KEY = "QUOTE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTED_KEY = "QUOTED"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::SOURCE_SRID_KEY = "SOURCE_SRID"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::TRIM_SPACES_KEY = "TRIM_SPACES"
inlinestatic

The documentation for this class was generated from the following files: