OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::CsvFileBufferParser Class Reference

#include <CsvFileBufferParser.h>

+ Inheritance diagram for foreign_storage::CsvFileBufferParser:
+ Collaboration diagram for foreign_storage::CsvFileBufferParser:

Public Member Functions

ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
 
import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const override
 
size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
 
void validateExpectedColumnCount (const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
 
void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const override
 

Additional Inherited Members

- Static Public Member Functions inherited from foreign_storage::TextFileBufferParser
static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Detailed Description

Definition at line 22 of file CsvFileBufferParser.h.

Member Function Documentation

size_t foreign_storage::CsvFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
foreign_storage::FileReader file_reader 
) const
overridevirtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implements foreign_storage::TextFileBufferParser.

Definition at line 332 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::find_row_end_pos().

339  {
341  buffer,
342  buffer_size,
343  copy_params,
344  buffer_first_row_index,
345  num_rows_in_buffer,
346  nullptr,
347  file_reader);
348 }
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...

+ Here is the call graph for this function:

ParseBufferResult foreign_storage::CsvFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
overridevirtual

Parses a given CSV file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Implements foreign_storage::TextFileBufferParser.

Definition at line 121 of file CsvFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::begin_pos, foreign_storage::ParseBufferRequest::buffer, foreign_storage::ParseBufferRequest::buffer_size, CHECK, foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks(), foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::end_pos, foreign_storage::ParseBufferRequest::file_offset, import_export::delimited_parser::find_beginning(), foreign_storage::ParseBufferRequest::first_row_index, import_export::delimited_parser::get_row(), foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::getColumns(), foreign_storage::ParseBufferRequest::getFilePath(), i, foreign_storage::ParseBufferRequest::import_buffers, is_null(), foreign_storage::TextFileBufferParser::isCoordinateScalar(), foreign_storage::TextFileBufferParser::isNullDatum(), kPOINT, import_export::CopyParams::null_str, foreign_storage::ParseBufferRequest::process_row_count, foreign_storage::TextFileBufferParser::processGeoColumn(), run_benchmark_import::result, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::set_array_flags_and_geo_columns_count(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::skip_column_import(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

123  {
124  CHECK(request.buffer);
126  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
127  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
128  const char* thread_buf_end = request.buffer.get() + request.end_pos;
129  const char* buf_end = request.buffer.get() + request.buffer_size;
130 
131  std::vector<std::string_view> row;
132  size_t row_index_plus_one = 0;
133  const char* p = thread_buf;
134  bool try_single_thread = false;
135  int phys_cols = 0;
136  int point_cols = 0;
137  std::unique_ptr<bool[]> array_flags;
138 
140  array_flags, phys_cols, point_cols, request.getColumns());
141  auto num_cols = request.getColumns().size() - phys_cols;
142  if (columns_are_pre_filtered) {
143  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
144  if (skip_column_import(request, col_idx)) {
145  --num_cols;
146  }
147  }
148  }
149 
150  size_t row_count = 0;
151  size_t remaining_row_count = request.process_row_count;
152  std::vector<size_t> row_offsets{};
153  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
154 
155  std::string file_path = request.getFilePath();
156  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
157  row.clear();
158  row_count++;
159  std::vector<std::unique_ptr<char[]>>
160  tmp_buffers; // holds string w/ removed escape chars, etc
161  const char* line_start = p;
163  thread_buf_end,
164  buf_end,
165  request.copy_params,
166  array_flags.get(),
167  row,
168  tmp_buffers,
169  try_single_thread,
170  !columns_are_pre_filtered);
171 
172  row_index_plus_one++;
173  validate_expected_column_count(row, num_cols, point_cols, file_path);
174 
175  size_t import_idx = 0;
176  size_t col_idx = 0;
177  try {
178  auto columns = request.getColumns();
179  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
180  auto cd = *cd_it;
181  const auto& col_ti = cd->columnType;
182  bool column_is_present =
183  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
184  CHECK(row.size() > import_idx || !column_is_present);
185  bool is_null =
186  column_is_present
187  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
188  : true;
189 
190  if (col_ti.is_geometry()) {
191  if (!skip_column_import(request, col_idx)) {
192  processGeoColumn(request.import_buffers,
193  col_idx,
194  request.copy_params,
195  cd_it,
196  row,
197  import_idx,
198  is_null,
199  request.first_row_index,
200  row_index_plus_one,
201  request.getCatalog());
202  } else {
203  // update import/col idx according to types
204  if (!is_null && cd->columnType == kPOINT &&
205  isCoordinateScalar(row[import_idx])) {
206  if (!columns_are_pre_filtered) {
207  ++import_idx;
208  }
209  }
210  if (!columns_are_pre_filtered) {
211  ++import_idx;
212  }
213  ++col_idx;
214  col_idx += col_ti.get_physical_cols();
215  }
216  // skip remaining physical columns
217  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
218  ++cd_it;
219  }
220  } else {
221  if (!skip_column_import(request, col_idx)) {
222  request.import_buffers[col_idx]->add_value(
223  cd, row[import_idx], is_null, request.copy_params);
224  }
225  if (column_is_present) {
226  ++import_idx;
227  }
228  ++col_idx;
229  }
230  }
231  } catch (const std::exception& e) {
232  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
233  "\" in row \"" + std::string(line_start, p) +
234  "\" in file \"" + file_path + "\"");
235  }
236  }
237  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
238 
239  ParseBufferResult result{};
240  result.row_offsets = row_offsets;
241  result.row_count = row_count;
242  if (convert_data_blocks) {
243  result.column_id_to_data_blocks_map =
244  convertImportBuffersToDataBlocks(request.import_buffers);
245  }
246  return result;
247 }
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
CONSTEXPR DEVICE bool is_null(const T &value)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
bool g_enable_smem_group_by true
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:209
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::CsvFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
overridevirtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implements foreign_storage::TextFileBufferParser.

Definition at line 277 of file CsvFileBufferParser.cpp.

References import_export::HAS_HEADER, import_export::NO_HEADER, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_bool_value(), foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_delimiter(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_and_get_string_with_length().

Referenced by foreign_storage::CsvDataWrapper::validateTableOptions().

278  {
279  import_export::CopyParams copy_params{};
280  copy_params.plain_text = true;
281  if (const auto& value =
282  validate_and_get_string_with_length(foreign_table, "ARRAY_DELIMITER", 1);
283  !value.empty()) {
284  copy_params.array_delim = value[0];
285  }
286  if (const auto& value =
287  validate_and_get_string_with_length(foreign_table, "ARRAY_MARKER", 2);
288  !value.empty()) {
289  copy_params.array_begin = value[0];
290  copy_params.array_end = value[1];
291  }
292  if (auto it = foreign_table->options.find("BUFFER_SIZE");
293  it != foreign_table->options.end()) {
294  copy_params.buffer_size = std::stoi(it->second);
295  }
296  if (const auto& value = validate_and_get_delimiter(foreign_table, "DELIMITER");
297  !value.empty()) {
298  copy_params.delimiter = value[0];
299  }
300  if (const auto& value = validate_and_get_string_with_length(foreign_table, "ESCAPE", 1);
301  !value.empty()) {
302  copy_params.escape = value[0];
303  }
304  auto has_header = validate_and_get_bool_value(foreign_table, "HEADER");
305  if (has_header.has_value()) {
306  if (has_header.value()) {
307  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
308  } else {
309  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
310  }
311  }
312  if (const auto& value = validate_and_get_delimiter(foreign_table, "LINE_DELIMITER");
313  !value.empty()) {
314  copy_params.line_delim = value[0];
315  }
316  copy_params.lonlat =
317  validate_and_get_bool_value(foreign_table, "LONLAT").value_or(copy_params.lonlat);
318 
319  if (auto it = foreign_table->options.find("NULLS");
320  it != foreign_table->options.end()) {
321  copy_params.null_str = it->second;
322  }
323  if (const auto& value = validate_and_get_string_with_length(foreign_table, "QUOTE", 1);
324  !value.empty()) {
325  copy_params.quote = value[0];
326  }
327  copy_params.quoted =
328  validate_and_get_bool_value(foreign_table, "QUOTED").value_or(copy_params.quoted);
329  return copy_params;
330 }
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvFileBufferParser::validateExpectedColumnCount ( const std::string &  row,
const import_export::CopyParams copy_params,
size_t  num_cols,
int  point_cols,
const std::string &  file_name 
) const

Takes a single row and verifies number of columns is valid for num_cols and point_cols (number of point columns)

Definition at line 252 of file CsvFileBufferParser.cpp.

References import_export::delimited_parser::get_row(), and foreign_storage::anonymous_namespace{CsvFileBufferParser.cpp}::validate_expected_column_count().

257  {
258  bool is_array = false;
259  bool try_single_thread = false;
260  std::vector<std::unique_ptr<char[]>> tmp_buffers;
261  std::vector<std::string_view> fields;
262  // parse columns in row into fields (other return values are intentionally ignored)
264  row.c_str() + row.size(),
265  row.c_str() + row.size(),
266  copy_params,
267  &is_array,
268  fields,
269  tmp_buffers,
270  try_single_thread,
271  false // Don't filter empty lines
272  );
273  // Check we have right number of columns
274  validate_expected_column_count(fields, num_cols, point_cols, file_name);
275 }
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)

+ Here is the call graph for this function:

void foreign_storage::CsvFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
inlineoverridevirtual

Performs basic validation of files to be parsed.

Implements foreign_storage::TextFileBufferParser.

Definition at line 45 of file CsvFileBufferParser.h.

46  {};

The documentation for this class was generated from the following files: