OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CsvFileBufferParser Class Reference

#include <CsvFileBufferParser.h>

+ Inheritance diagram for foreign_storage::CsvFileBufferParser:
+ Collaboration diagram for foreign_storage::CsvFileBufferParser:

Public Member Functions

ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
 
void validateExpectedColumnCount (const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Static Public Attributes

static const std::string DELIMITER_KEY = "DELIMITER"
 
static const std::string NULLS_KEY = "NULLS"
 
static const std::string HEADER_KEY = "HEADER"
 
static const std::string QUOTED_KEY = "QUOTED"
 
static const std::string QUOTE_KEY = "QUOTE"
 
static const std::string ESCAPE_KEY = "ESCAPE"
 
static const std::string LINE_DELIMITER_KEY = "LINE_DELIMITER"
 
static const std::string ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
 
static const std::string ARRAY_MARKER_KEY = "ARRAY_MARKER"
 
static const std::string LONLAT_KEY = "LONLAT"
 
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
 
static const std::string GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
 
static const std::string SOURCE_SRID_KEY = "SOURCE_SRID"
 
- Static Public Attributes inherited from foreign_storage::TextFileBufferParser
static const std::string THREADS_KEY = "THREADS"
 
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Additional Inherited Members

- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Detailed Description

Definition at line 22 of file CsvFileBufferParser.h.

Member Function Documentation

size_t foreign_storage::CsvFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
foreign_storage::FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 401 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::find_row_end_pos().

408  {
410  buffer,
411  buffer_size,
412  copy_params,
413  buffer_first_row_index,
414  num_rows_in_buffer,
415  nullptr,
416  file_reader);
417 }
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...

+ Here is the call graph for this function:

ParseBufferResult foreign_storage::CsvFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
overridevirtual

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 121 of file CsvFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, foreign_storage::ParseBufferRequest::buffer_size, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData(), import_export::delimited_parser::find_beginning(), foreign_storage::ParseBufferRequest::first_row_index, import_export::delimited_parser::get_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isCoordinateScalar(), foreign_storage::TextFileBufferParser::isNullDatum(), kPOINT, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), foreign_storage::ParseBufferRequest::render_group_analyzer_map, run_benchmark_import::result, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::set_array_flags_and_geo_columns_count(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::skip_column_import(), foreign_storage::ParseBufferRequest::track_rejected_rows, and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

123  {
124  CHECK(request.buffer);
126  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
127  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
128  const char* thread_buf_end = request.buffer.get() + request.end_pos;
129  const char* buf_end = request.buffer.get() + request.buffer_size;
130 
131  std::vector<std::string_view> row;
132  size_t row_index_plus_one = 0;
133  const char* p = thread_buf;
134  bool try_single_thread = false;
135  int phys_cols = 0;
136  int point_cols = 0;
137  std::unique_ptr<bool[]> array_flags;
138 
140  array_flags, phys_cols, point_cols, request.getColumns());
141  auto num_cols = request.getColumns().size() - phys_cols;
142  if (columns_are_pre_filtered) {
143  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
144  if (skip_column_import(request, col_idx)) {
145  --num_cols;
146  }
147  }
148  }
149 
150  size_t current_row_id = 0;
151  size_t row_count = 0;
152  size_t remaining_row_count = request.process_row_count;
153  std::vector<size_t> row_offsets{};
154  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
155 
156  ParseBufferResult result{};
157 
158  std::string file_path = request.getFilePath();
159  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
160  row.clear();
161  current_row_id = row_count;
162  row_count++;
163  std::vector<std::unique_ptr<char[]>>
164  tmp_buffers; // holds string w/ removed escape chars, etc
165  const char* line_start = p;
167  thread_buf_end,
168  buf_end,
169  request.copy_params,
170  array_flags.get(),
171  row,
172  tmp_buffers,
173  try_single_thread,
174  !columns_are_pre_filtered);
175 
176  row_index_plus_one++;
177 
178  bool incorrect_column_count = false;
179  try {
180  validate_expected_column_count(row, num_cols, point_cols, file_path);
181  } catch (const ForeignStorageException& e) {
182  if (request.track_rejected_rows) {
183  result.rejected_rows.insert(current_row_id);
184  incorrect_column_count = true;
185  } else {
186  throw;
187  }
188  }
189 
190  size_t import_idx = 0;
191  size_t col_idx = 0;
192 
193  try {
194  auto columns = request.getColumns();
195  if (incorrect_column_count) {
196  auto cd_it = columns.begin();
197  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
198  continue;
199  }
200  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
201  auto cd = *cd_it;
202  const auto& col_ti = cd->columnType;
203  bool column_is_present =
204  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
205  CHECK(row.size() > import_idx || !column_is_present);
206  bool is_null = false;
207  try {
208  is_null = column_is_present
209  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
210  : true;
211  } catch (const std::exception& e) {
212  if (request.track_rejected_rows) {
213  result.rejected_rows.insert(current_row_id);
214  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
215  break; // skip rest of row
216  } else {
217  throw;
218  }
219  }
220 
221  if (col_ti.is_geometry()) {
222  if (!skip_column_import(request, col_idx)) {
223  auto starting_col_idx = col_idx;
224  try {
225  processGeoColumn(request.import_buffers,
226  col_idx,
227  request.copy_params,
228  cd_it,
229  row,
230  import_idx,
231  is_null,
232  request.first_row_index,
233  row_index_plus_one,
234  request.getCatalog(),
235  request.render_group_analyzer_map);
236  } catch (const std::exception& e) {
237  if (request.track_rejected_rows) {
238  result.rejected_rows.insert(current_row_id);
239  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
240  break; // skip rest of row
241  } else {
242  throw;
243  }
244  }
245  } else {
246  // update import/col idx according to types
247  if (!is_null && cd->columnType == kPOINT &&
248  isCoordinateScalar(row[import_idx])) {
249  if (!columns_are_pre_filtered) {
250  ++import_idx;
251  }
252  }
253  if (!columns_are_pre_filtered) {
254  ++import_idx;
255  }
256  ++col_idx;
257  col_idx += col_ti.get_physical_cols();
258  }
259  // skip remaining physical columns
260  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
261  ++cd_it;
262  }
263  } else {
264  if (!skip_column_import(request, col_idx)) {
265  try {
266  request.import_buffers[col_idx]->add_value(
267  cd, row[import_idx], is_null, request.copy_params);
268  } catch (const std::exception& e) {
269  if (request.track_rejected_rows) {
270  result.rejected_rows.insert(current_row_id);
271  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
272  break; // skip rest of row
273  } else {
274  throw;
275  }
276  }
277  }
278  if (column_is_present) {
279  ++import_idx;
280  }
281  ++col_idx;
282  }
283  }
284  } catch (const std::exception& e) {
285  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
286  "\" in row \"" + std::string(line_start, p) +
287  "\" in file \"" + file_path + "\"");
288  }
289  }
290  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
291 
292  result.row_offsets = row_offsets;
293  result.row_count = row_count;
294  if (convert_data_blocks) {
295  result.column_id_to_data_blocks_map =
296  convertImportBuffersToDataBlocks(request.import_buffers);
297  }
298  return result;
299 }
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
CONSTEXPR DEVICE bool is_null(const T &value)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
bool g_enable_smem_group_by true
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:223
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::CsvFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 329 of file CsvFileBufferParser.cpp.

References ARRAY_DELIMITER_KEY, ARRAY_MARKER_KEY, foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY, DELIMITER_KEY, ESCAPE_KEY, GEO_ASSIGN_RENDER_GROUPS_KEY, GEO_EXPLODE_COLLECTIONS_KEY, HEADER_KEY, import_export::kHasHeader, import_export::kNoHeader, LINE_DELIMITER_KEY, LONLAT_KEY, NULLS_KEY, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, QUOTE_KEY, QUOTED_KEY, SOURCE_SRID_KEY, foreign_storage::TextFileBufferParser::THREADS_KEY, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_delimiter(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_string_with_length().

Referenced by foreign_storage::CsvDataWrapper::validateTableOptions().

330  {
331  import_export::CopyParams copy_params{};
332  copy_params.plain_text = true;
333  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
334  !value.empty()) {
335  copy_params.delimiter = value[0];
336  }
337  if (auto it = foreign_table->options.find(NULLS_KEY);
338  it != foreign_table->options.end()) {
339  copy_params.null_str = it->second;
340  }
341  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
342  if (has_header.has_value()) {
343  if (has_header.value()) {
344  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
345  } else {
346  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
347  }
348  }
349  copy_params.quoted =
350  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
351  if (const auto& value =
353  !value.empty()) {
354  copy_params.quote = value[0];
355  }
356  if (const auto& value =
358  !value.empty()) {
359  copy_params.escape = value[0];
360  }
361  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
362  !value.empty()) {
363  copy_params.line_delim = value[0];
364  }
365  if (const auto& value =
367  !value.empty()) {
368  copy_params.array_delim = value[0];
369  }
370  if (const auto& value =
372  !value.empty()) {
373  copy_params.array_begin = value[0];
374  copy_params.array_end = value[1];
375  }
376  copy_params.lonlat =
377  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
378  copy_params.geo_assign_render_groups =
380  .value_or(copy_params.geo_assign_render_groups);
381  copy_params.geo_explode_collections =
383  .value_or(copy_params.geo_explode_collections);
384  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
385  it != foreign_table->options.end()) {
386  copy_params.source_srid = std::stoi(it->second);
387  }
388 
389  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
390  it != foreign_table->options.end()) {
391  copy_params.buffer_size = std::stoi(it->second);
392  }
393  if (auto it = foreign_table->options.find(THREADS_KEY);
394  it != foreign_table->options.end()) {
395  copy_params.threads = std::stoi(it->second);
396  }
397 
398  return copy_params;
399 }
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const std::string ARRAY_MARKER_KEY
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string LINE_DELIMITER_KEY
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
static const std::string ARRAY_DELIMITER_KEY
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvFileBufferParser::validateExpectedColumnCount ( const std::string &  row,
const import_export::CopyParams copy_params,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
) const

Takes a single row and verifies number of columns is valid for num_cols and point_cols (number of point columns)

Definition at line 304 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::get_row(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

309  {
310  bool is_array = false;
311  bool try_single_thread = false;
312  std::vector<std::unique_ptr<char[]>> tmp_buffers;
313  std::vector<std::string_view> fields;
314  // parse columns in row into fields (other return values are intentionally ignored)
316  row.c_str() + row.size(),
317  row.c_str() + row.size(),
318  copy_params,
319  &is_array,
320  fields,
321  tmp_buffers,
322  try_single_thread,
323  false // Don't filter empty lines
324  );
325  // Check we have right number of columns
326  validate_expected_column_count(fields, num_cols, point_cols, file_name);
327 }
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)

+ Here is the call graph for this function:

void foreign_storage::CsvFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
inlineoverridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 45 of file CsvFileBufferParser.h.

46  {};

Member Data Documentation

const std::string foreign_storage::CsvFileBufferParser::ARRAY_DELIMITER_KEY = "ARRAY_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ARRAY_MARKER_KEY = "ARRAY_MARKER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::DELIMITER_KEY = "DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::ESCAPE_KEY = "ESCAPE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY
inlinestatic
Initial value:
=
"GEO_ASSIGN_RENDER_GROUPS"

Definition at line 59 of file CsvFileBufferParser.h.

Referenced by foreign_storage::ForeignDataWrapperFactory::createForeignTableProxy(), and validateAndGetCopyParams().

const std::string foreign_storage::CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY = "GEO_EXPLODE_COLLECTIONS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::HEADER_KEY = "HEADER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LINE_DELIMITER_KEY = "LINE_DELIMITER"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::LONLAT_KEY = "LONLAT"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::NULLS_KEY = "NULLS"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTE_KEY = "QUOTE"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::QUOTED_KEY = "QUOTED"
inlinestatic
const std::string foreign_storage::CsvFileBufferParser::SOURCE_SRID_KEY = "SOURCE_SRID"
inlinestatic

The documentation for this class was generated from the following files: