OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
21 #include "ImportExport/Importer.h"
22 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
27 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
28  return request.import_buffers[column_idx] == nullptr;
29 }
30 
32  std::unique_ptr<bool[]>& array_flags,
33  int& phys_cols,
34  int& point_cols,
35  const std::list<const ColumnDescriptor*>& columns) {
36  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
37  size_t i = 0;
38  for (const auto cd : columns) {
39  const auto& col_ti = cd->columnType;
40  phys_cols += col_ti.get_physical_cols();
41  if (cd->columnType.get_type() == kPOINT) {
42  point_cols++;
43  }
44 
45  if (cd->columnType.get_type() == kARRAY) {
46  array_flags.get()[i] = true;
47  } else {
48  array_flags.get()[i] = false;
49  }
50  i++;
51  }
52 }
53 
54 void validate_expected_column_count(std::vector<std::string_view>& row,
55  size_t num_cols,
56  int point_cols,
57  const std::string& file_name) {
58  // Each POINT could consume two separate coords instead of a single WKT
59  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
60  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
61  }
62 }
63 
64 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
65  const std::string& option_name) {
66  if (auto it = foreign_table->options.find(option_name);
67  it != foreign_table->options.end()) {
68  if (it->second.length() == 1) {
69  return it->second;
70  } else {
71  if (it->second == std::string("\\n")) {
72  return "\n";
73  } else if (it->second == std::string("\\t")) {
74  return "\t";
75  } else {
76  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
77  "\". Expected a single character, \"\\n\" or \"\\t\"."};
78  }
79  }
80  }
81  return "";
82 }
83 
84 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
85  const std::string& option_name,
86  const size_t expected_num_chars) {
87  if (auto it = foreign_table->options.find(option_name);
88  it != foreign_table->options.end()) {
89  if (it->second.length() != expected_num_chars) {
90  throw std::runtime_error{"Value of \"" + option_name +
91  "\" foreign table option has the wrong number of "
92  "characters. Expected " +
93  std::to_string(expected_num_chars) + " character(s)."};
94  }
95  return it->second;
96  }
97  return "";
98 }
99 
100 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
101  const std::string& option_name) {
102  if (auto it = foreign_table->options.find(option_name);
103  it != foreign_table->options.end()) {
104  if (boost::iequals(it->second, "TRUE")) {
105  return true;
106  } else if (boost::iequals(it->second, "FALSE")) {
107  return false;
108  } else {
109  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
110  "\" foreign table option. "
111  "Value must be either 'true' or 'false'."};
112  }
113  }
114  return std::nullopt;
115 }
116 } // namespace
117 
123  bool convert_data_blocks,
124  bool columns_are_pre_filtered,
125  bool skip_dict_encoding) const {
126  CHECK(request.buffer);
128  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
129  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
130  const char* thread_buf_end = request.buffer.get() + request.end_pos;
131  const char* buf_end = request.buffer.get() + request.buffer_size;
132 
133  std::vector<std::string_view> row;
134  size_t row_index_plus_one = 0;
135  const char* p = thread_buf;
136  bool try_single_thread = false;
137  int phys_cols = 0;
138  int point_cols = 0;
139  std::unique_ptr<bool[]> array_flags;
140 
142  array_flags,
143  phys_cols,
144  point_cols,
145  request.foreign_table_schema->getLogicalColumns());
146  auto num_cols = request.getColumns().size() - phys_cols;
147  if (columns_are_pre_filtered) {
148  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
149  if (skip_column_import(request, col_idx)) {
150  --num_cols;
151  }
152  }
153  }
154 
155  size_t current_row_id = 0;
156  size_t row_count = 0;
157  size_t remaining_row_count = request.process_row_count;
158  std::vector<size_t> row_offsets{};
159  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
160 
162 
163  std::string file_path = request.getFilePath();
164  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
165  row.clear();
166  current_row_id = row_count;
167  row_count++;
168  std::vector<std::unique_ptr<char[]>>
169  tmp_buffers; // holds string w/ removed escape chars, etc
170  const char* line_start = p;
171  row_index_plus_one++;
172  bool incorrect_column_count = false;
174  thread_buf_end,
175  buf_end,
176  request.copy_params,
177  array_flags.get(),
178  row,
179  tmp_buffers,
180  try_single_thread,
181  !columns_are_pre_filtered);
182  try {
183  validate_expected_column_count(row, num_cols, point_cols, file_path);
184  } catch (const ForeignStorageException& e) {
185  if (request.track_rejected_rows) {
186  result.rejected_rows.insert(current_row_id);
187  incorrect_column_count = true;
188  } else {
189  throw;
190  }
191  }
192 
193  size_t import_idx = 0;
194  size_t col_idx = 0;
195 
196  try {
197  auto columns = request.getColumns();
198  if (incorrect_column_count) {
199  auto cd_it = columns.begin();
200  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
201  continue;
202  }
203  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
204  auto cd = *cd_it;
205  const auto& col_ti = cd->columnType;
206  bool column_is_present =
207  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
208  CHECK(row.size() > import_idx || !column_is_present);
209  bool is_null = false;
210  try {
211  is_null = column_is_present
212  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
213  : true;
214  } catch (const std::exception& e) {
215  if (request.track_rejected_rows) {
216  result.rejected_rows.insert(current_row_id);
217  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
218  break; // skip rest of row
219  } else {
220  throw;
221  }
222  }
223  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
224  // everything but strings should be always trimmed
225  row[import_idx] = sv_strip(row[import_idx]);
226  }
227  if (col_ti.is_geometry()) {
228  if (!skip_column_import(request, col_idx)) {
229  auto starting_col_idx = col_idx;
230  try {
232  col_idx,
233  request.copy_params,
234  cd_it,
235  row,
236  import_idx,
237  is_null,
238  request.first_row_index,
239  row_index_plus_one,
240  request.getCatalog());
241  } catch (const std::exception& e) {
242  if (request.track_rejected_rows) {
243  result.rejected_rows.insert(current_row_id);
244  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
245  break; // skip rest of row
246  } else {
247  throw;
248  }
249  }
250  } else {
251  // update import/col idx according to types
252  if (!is_null && cd->columnType == kPOINT &&
253  isCoordinateScalar(row[import_idx])) {
254  if (!columns_are_pre_filtered) {
255  ++import_idx;
256  }
257  }
258  if (!columns_are_pre_filtered) {
259  ++import_idx;
260  }
261  ++col_idx;
262  col_idx += col_ti.get_physical_cols();
263  }
264  // skip remaining physical columns
265  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
266  ++cd_it;
267  }
268  } else {
269  if (!skip_column_import(request, col_idx)) {
270  try {
271  request.import_buffers[col_idx]->add_value(
272  cd, row[import_idx], is_null, request.copy_params);
273  } catch (const std::exception& e) {
274  if (request.track_rejected_rows) {
275  result.rejected_rows.insert(current_row_id);
276  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
277  break; // skip rest of row
278  } else {
279  throw;
280  }
281  }
282  }
283  if (column_is_present) {
284  ++import_idx;
285  }
286  ++col_idx;
287  }
288  }
289  } catch (const std::exception& e) {
290  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
291  "\" in row \"" + std::string(line_start, p) +
292  "\" in file \"" + file_path + "\"");
293  }
294  }
295  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
296 
297  result.row_offsets = row_offsets;
298  result.row_count = row_count;
299  if (convert_data_blocks) {
300  result.column_id_to_data_blocks_map =
301  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
302  }
303  return result;
304 }
310  const std::string& row,
311  const import_export::CopyParams& copy_params,
312  size_t num_cols,
313  int point_cols,
314  const std::string& file_name) const {
315  bool is_array = false;
316  bool try_single_thread = false;
317  std::vector<std::unique_ptr<char[]>> tmp_buffers;
318  std::vector<std::string_view> fields;
319  // parse columns in row into fields (other return values are intentionally ignored)
321  row.c_str() + row.size(),
322  row.c_str() + row.size(),
323  copy_params,
324  &is_array,
325  fields,
326  tmp_buffers,
327  try_single_thread,
328  false // Don't filter empty lines
329  );
330  // Check we have right number of columns
331  validate_expected_column_count(fields, num_cols, point_cols, file_name);
332 }
333 
335  const ForeignTable* foreign_table) const {
336  import_export::CopyParams copy_params{};
337  copy_params.plain_text = true;
338  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
339  !value.empty()) {
340  copy_params.delimiter = value[0];
341  }
342  if (auto it = foreign_table->options.find(NULLS_KEY);
343  it != foreign_table->options.end()) {
344  copy_params.null_str = it->second;
345  }
346  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
347  if (has_header.has_value()) {
348  if (has_header.value()) {
349  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
350  } else {
351  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
352  }
353  }
354  copy_params.quoted =
355  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
356  if (const auto& value =
358  !value.empty()) {
359  copy_params.quote = value[0];
360  }
361  if (const auto& value =
363  !value.empty()) {
364  copy_params.escape = value[0];
365  }
366  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
367  !value.empty()) {
368  copy_params.line_delim = value[0];
369  }
370  if (const auto& value =
372  !value.empty()) {
373  copy_params.array_delim = value[0];
374  }
375  if (const auto& value =
377  !value.empty()) {
378  copy_params.array_begin = value[0];
379  copy_params.array_end = value[1];
380  }
381  copy_params.lonlat =
382  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
383  copy_params.geo_explode_collections =
385  .value_or(copy_params.geo_explode_collections);
386  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
387  it != foreign_table->options.end()) {
388  copy_params.source_srid = std::stoi(it->second);
389  }
390 
391  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
392  it != foreign_table->options.end()) {
393  copy_params.buffer_size = std::stoi(it->second);
394  }
395  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
396  it != foreign_table->options.end()) {
397  copy_params.threads = std::stoi(it->second);
398  }
399  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
400  .value_or(copy_params.trim_spaces);
401  copy_params.geo_validate_geometry =
403  .value_or(copy_params.geo_validate_geometry);
404 
405  return copy_params;
406 }
407 
409  size_t& alloc_size,
410  std::unique_ptr<char[]>& buffer,
411  size_t& buffer_size,
412  const import_export::CopyParams& copy_params,
413  const size_t buffer_first_row_index,
414  unsigned int& num_rows_in_buffer,
415  foreign_storage::FileReader* file_reader) const {
417  buffer,
418  buffer_size,
419  copy_params,
420  buffer_first_row_index,
421  num_rows_in_buffer,
422  nullptr,
423  file_reader);
424 }
425 } // namespace foreign_storage
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
std::string_view sv_strip(std::string_view str)
return trimmed string_view
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:291
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override