OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
21 #include "ImportExport/Importer.h"
22 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
27 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
28  return request.import_buffers[column_idx] == nullptr;
29 }
30 
32  std::unique_ptr<bool[]>& array_flags,
33  int& phys_cols,
34  int& point_cols,
35  const std::list<const ColumnDescriptor*>& columns) {
36  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
37  size_t i = 0;
38  for (const auto cd : columns) {
39  const auto& col_ti = cd->columnType;
40  phys_cols += col_ti.get_physical_cols();
41  if (cd->columnType.get_type() == kPOINT) {
42  point_cols++;
43  }
44 
45  if (cd->columnType.get_type() == kARRAY) {
46  array_flags.get()[i] = true;
47  } else {
48  array_flags.get()[i] = false;
49  }
50  i++;
51  }
52 }
53 
54 void validate_expected_column_count(std::vector<std::string_view>& row,
55  size_t num_cols,
56  int point_cols,
57  const std::string& file_name) {
58  // Each POINT could consume two separate coords instead of a single WKT
59  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
60  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
61  }
62 }
63 
64 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
65  const std::string& option_name) {
66  if (auto it = foreign_table->options.find(option_name);
67  it != foreign_table->options.end()) {
68  if (it->second.length() == 1) {
69  return it->second;
70  } else {
71  if (it->second == std::string("\\n")) {
72  return "\n";
73  } else if (it->second == std::string("\\t")) {
74  return "\t";
75  } else {
76  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
77  "\". Expected a single character, \"\\n\" or \"\\t\"."};
78  }
79  }
80  }
81  return "";
82 }
83 
84 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
85  const std::string& option_name,
86  const size_t expected_num_chars) {
87  if (auto it = foreign_table->options.find(option_name);
88  it != foreign_table->options.end()) {
89  if (it->second.length() != expected_num_chars) {
90  throw std::runtime_error{"Value of \"" + option_name +
91  "\" foreign table option has the wrong number of "
92  "characters. Expected " +
93  std::to_string(expected_num_chars) + " character(s)."};
94  }
95  return it->second;
96  }
97  return "";
98 }
99 
100 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
101  const std::string& option_name) {
102  if (auto it = foreign_table->options.find(option_name);
103  it != foreign_table->options.end()) {
104  if (boost::iequals(it->second, "TRUE")) {
105  return true;
106  } else if (boost::iequals(it->second, "FALSE")) {
107  return false;
108  } else {
109  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
110  "\" foreign table option. "
111  "Value must be either 'true' or 'false'."};
112  }
113  }
114  return std::nullopt;
115 }
116 } // namespace
117 
123  bool convert_data_blocks,
124  bool columns_are_pre_filtered,
125  bool skip_dict_encoding) const {
126  CHECK(request.buffer);
128  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
129  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
130  const char* thread_buf_end = request.buffer.get() + request.end_pos;
131  const char* buf_end = request.buffer.get() + request.buffer_size;
132 
133  std::vector<std::string_view> row;
134  size_t row_index_plus_one = 0;
135  const char* p = thread_buf;
136  bool try_single_thread = false;
137  int phys_cols = 0;
138  int point_cols = 0;
139  std::unique_ptr<bool[]> array_flags;
140 
142  array_flags,
143  phys_cols,
144  point_cols,
145  request.foreign_table_schema->getLogicalColumns());
146  auto num_cols = request.getColumns().size() - phys_cols;
147  if (columns_are_pre_filtered) {
148  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
149  if (skip_column_import(request, col_idx)) {
150  --num_cols;
151  }
152  }
153  }
154 
155  size_t current_row_id = 0;
156  size_t row_count = 0;
157  size_t remaining_row_count = request.process_row_count;
158  std::vector<size_t> row_offsets{};
159  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
160 
162 
163  std::string file_path = request.getFilePath();
164  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
165  row.clear();
166  current_row_id = row_count;
167  row_count++;
168  std::vector<std::unique_ptr<char[]>>
169  tmp_buffers; // holds string w/ removed escape chars, etc
170  const char* line_start = p;
172  thread_buf_end,
173  buf_end,
174  request.copy_params,
175  array_flags.get(),
176  row,
177  tmp_buffers,
178  try_single_thread,
179  !columns_are_pre_filtered);
180 
181  row_index_plus_one++;
182 
183  bool incorrect_column_count = false;
184  try {
185  validate_expected_column_count(row, num_cols, point_cols, file_path);
186  } catch (const ForeignStorageException& e) {
187  if (request.track_rejected_rows) {
188  result.rejected_rows.insert(current_row_id);
189  incorrect_column_count = true;
190  } else {
191  throw;
192  }
193  }
194 
195  size_t import_idx = 0;
196  size_t col_idx = 0;
197 
198  try {
199  auto columns = request.getColumns();
200  if (incorrect_column_count) {
201  auto cd_it = columns.begin();
202  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
203  continue;
204  }
205  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
206  auto cd = *cd_it;
207  const auto& col_ti = cd->columnType;
208  bool column_is_present =
209  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
210  CHECK(row.size() > import_idx || !column_is_present);
211  bool is_null = false;
212  try {
213  is_null = column_is_present
214  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
215  : true;
216  } catch (const std::exception& e) {
217  if (request.track_rejected_rows) {
218  result.rejected_rows.insert(current_row_id);
219  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
220  break; // skip rest of row
221  } else {
222  throw;
223  }
224  }
225  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
226  // everything but strings should be always trimmed
227  row[import_idx] = sv_strip(row[import_idx]);
228  }
229  if (col_ti.is_geometry()) {
230  if (!skip_column_import(request, col_idx)) {
231  auto starting_col_idx = col_idx;
232  try {
234  col_idx,
235  request.copy_params,
236  cd_it,
237  row,
238  import_idx,
239  is_null,
240  request.first_row_index,
241  row_index_plus_one,
242  request.getCatalog(),
243  request.render_group_analyzer_map);
244  } catch (const std::exception& e) {
245  if (request.track_rejected_rows) {
246  result.rejected_rows.insert(current_row_id);
247  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
248  break; // skip rest of row
249  } else {
250  throw;
251  }
252  }
253  } else {
254  // update import/col idx according to types
255  if (!is_null && cd->columnType == kPOINT &&
256  isCoordinateScalar(row[import_idx])) {
257  if (!columns_are_pre_filtered) {
258  ++import_idx;
259  }
260  }
261  if (!columns_are_pre_filtered) {
262  ++import_idx;
263  }
264  ++col_idx;
265  col_idx += col_ti.get_physical_cols();
266  }
267  // skip remaining physical columns
268  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
269  ++cd_it;
270  }
271  } else {
272  if (!skip_column_import(request, col_idx)) {
273  try {
274  request.import_buffers[col_idx]->add_value(
275  cd, row[import_idx], is_null, request.copy_params);
276  } catch (const std::exception& e) {
277  if (request.track_rejected_rows) {
278  result.rejected_rows.insert(current_row_id);
279  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
280  break; // skip rest of row
281  } else {
282  throw;
283  }
284  }
285  }
286  if (column_is_present) {
287  ++import_idx;
288  }
289  ++col_idx;
290  }
291  }
292  } catch (const std::exception& e) {
293  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
294  "\" in row \"" + std::string(line_start, p) +
295  "\" in file \"" + file_path + "\"");
296  }
297  }
298  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
299 
300  result.row_offsets = row_offsets;
301  result.row_count = row_count;
302  if (convert_data_blocks) {
303  result.column_id_to_data_blocks_map =
304  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
305  }
306  return result;
307 }
313  const std::string& row,
314  const import_export::CopyParams& copy_params,
315  size_t num_cols,
316  int point_cols,
317  const std::string& file_name) const {
318  bool is_array = false;
319  bool try_single_thread = false;
320  std::vector<std::unique_ptr<char[]>> tmp_buffers;
321  std::vector<std::string_view> fields;
322  // parse columns in row into fields (other return values are intentionally ignored)
324  row.c_str() + row.size(),
325  row.c_str() + row.size(),
326  copy_params,
327  &is_array,
328  fields,
329  tmp_buffers,
330  try_single_thread,
331  false // Don't filter empty lines
332  );
333  // Check we have right number of columns
334  validate_expected_column_count(fields, num_cols, point_cols, file_name);
335 }
336 
338  const ForeignTable* foreign_table) const {
339  import_export::CopyParams copy_params{};
340  copy_params.plain_text = true;
341  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
342  !value.empty()) {
343  copy_params.delimiter = value[0];
344  }
345  if (auto it = foreign_table->options.find(NULLS_KEY);
346  it != foreign_table->options.end()) {
347  copy_params.null_str = it->second;
348  }
349  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
350  if (has_header.has_value()) {
351  if (has_header.value()) {
352  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
353  } else {
354  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
355  }
356  }
357  copy_params.quoted =
358  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
359  if (const auto& value =
361  !value.empty()) {
362  copy_params.quote = value[0];
363  }
364  if (const auto& value =
366  !value.empty()) {
367  copy_params.escape = value[0];
368  }
369  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
370  !value.empty()) {
371  copy_params.line_delim = value[0];
372  }
373  if (const auto& value =
375  !value.empty()) {
376  copy_params.array_delim = value[0];
377  }
378  if (const auto& value =
380  !value.empty()) {
381  copy_params.array_begin = value[0];
382  copy_params.array_end = value[1];
383  }
384  copy_params.lonlat =
385  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
386  copy_params.geo_assign_render_groups =
388  .value_or(copy_params.geo_assign_render_groups);
389  copy_params.geo_explode_collections =
391  .value_or(copy_params.geo_explode_collections);
392  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
393  it != foreign_table->options.end()) {
394  copy_params.source_srid = std::stoi(it->second);
395  }
396 
397  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
398  it != foreign_table->options.end()) {
399  copy_params.buffer_size = std::stoi(it->second);
400  }
401  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
402  it != foreign_table->options.end()) {
403  copy_params.threads = std::stoi(it->second);
404  }
405  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
406  .value_or(copy_params.trim_spaces);
407 
408  return copy_params;
409 }
410 
412  size_t& alloc_size,
413  std::unique_ptr<char[]>& buffer,
414  size_t& buffer_size,
415  const import_export::CopyParams& copy_params,
416  const size_t buffer_first_row_index,
417  unsigned int& num_rows_in_buffer,
418  foreign_storage::FileReader* file_reader) const {
420  buffer,
421  buffer_size,
422  copy_params,
423  buffer_first_row_index,
424  num_rows_in_buffer,
425  nullptr,
426  file_reader);
427 }
428 } // namespace foreign_storage
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
std::string_view sv_strip(std::string_view str)
return trimmed string_view
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
const import_export::CopyParams copy_params
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:289
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override