OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 #include "ImportExport/Importer.h"
21 #include "Shared/StringTransform.h"
22 
23 namespace foreign_storage {
24 
25 namespace {
26 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
27  return request.import_buffers[column_idx] == nullptr;
28 }
29 
31  std::unique_ptr<bool[]>& array_flags,
32  int& phys_cols,
33  int& point_cols,
34  const std::list<const ColumnDescriptor*>& columns) {
35  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
36  size_t i = 0;
37  for (const auto cd : columns) {
38  const auto& col_ti = cd->columnType;
39  phys_cols += col_ti.get_physical_cols();
40  if (cd->columnType.get_type() == kPOINT) {
41  point_cols++;
42  }
43 
44  if (cd->columnType.get_type() == kARRAY) {
45  array_flags.get()[i] = true;
46  } else {
47  array_flags.get()[i] = false;
48  }
49  i++;
50  }
51 }
52 
53 void validate_expected_column_count(std::vector<std::string_view>& row,
54  size_t num_cols,
55  int point_cols,
56  const std::string& file_name) {
57  // Each POINT could consume two separate coords instead of a single WKT
58  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
59  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
60  }
61 }
62 
63 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
64  const std::string& option_name) {
65  if (auto it = foreign_table->options.find(option_name);
66  it != foreign_table->options.end()) {
67  if (it->second.length() == 1) {
68  return it->second;
69  } else {
70  if (it->second == std::string("\\n")) {
71  return "\n";
72  } else if (it->second == std::string("\\t")) {
73  return "\t";
74  } else {
75  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
76  "\". Expected a single character, \"\\n\" or \"\\t\"."};
77  }
78  }
79  }
80  return "";
81 }
82 
83 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
84  const std::string& option_name,
85  const size_t expected_num_chars) {
86  if (auto it = foreign_table->options.find(option_name);
87  it != foreign_table->options.end()) {
88  if (it->second.length() != expected_num_chars) {
89  throw std::runtime_error{"Value of \"" + option_name +
90  "\" foreign table option has the wrong number of "
91  "characters. Expected " +
92  std::to_string(expected_num_chars) + " character(s)."};
93  }
94  return it->second;
95  }
96  return "";
97 }
98 
99 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
100  const std::string& option_name) {
101  if (auto it = foreign_table->options.find(option_name);
102  it != foreign_table->options.end()) {
103  if (boost::iequals(it->second, "TRUE")) {
104  return true;
105  } else if (boost::iequals(it->second, "FALSE")) {
106  return false;
107  } else {
108  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
109  "\" foreign table option. "
110  "Value must be either 'true' or 'false'."};
111  }
112  }
113  return std::nullopt;
114 }
115 } // namespace
116 
122  bool convert_data_blocks,
123  bool columns_are_pre_filtered) const {
124  CHECK(request.buffer);
126  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
127  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
128  const char* thread_buf_end = request.buffer.get() + request.end_pos;
129  const char* buf_end = request.buffer.get() + request.buffer_size;
130 
131  std::vector<std::string_view> row;
132  size_t row_index_plus_one = 0;
133  const char* p = thread_buf;
134  bool try_single_thread = false;
135  int phys_cols = 0;
136  int point_cols = 0;
137  std::unique_ptr<bool[]> array_flags;
138 
140  array_flags, phys_cols, point_cols, request.getColumns());
141  auto num_cols = request.getColumns().size() - phys_cols;
142  if (columns_are_pre_filtered) {
143  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
144  if (skip_column_import(request, col_idx)) {
145  --num_cols;
146  }
147  }
148  }
149 
150  size_t current_row_id = 0;
151  size_t row_count = 0;
152  size_t remaining_row_count = request.process_row_count;
153  std::vector<size_t> row_offsets{};
154  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
155 
157 
158  std::string file_path = request.getFilePath();
159  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
160  row.clear();
161  current_row_id = row_count;
162  row_count++;
163  std::vector<std::unique_ptr<char[]>>
164  tmp_buffers; // holds string w/ removed escape chars, etc
165  const char* line_start = p;
167  thread_buf_end,
168  buf_end,
169  request.copy_params,
170  array_flags.get(),
171  row,
172  tmp_buffers,
173  try_single_thread,
174  !columns_are_pre_filtered);
175 
176  row_index_plus_one++;
177 
178  bool incorrect_column_count = false;
179  try {
180  validate_expected_column_count(row, num_cols, point_cols, file_path);
181  } catch (const ForeignStorageException& e) {
182  if (request.track_rejected_rows) {
183  result.rejected_rows.insert(current_row_id);
184  incorrect_column_count = true;
185  } else {
186  throw;
187  }
188  }
189 
190  size_t import_idx = 0;
191  size_t col_idx = 0;
192 
193  try {
194  auto columns = request.getColumns();
195  if (incorrect_column_count) {
196  auto cd_it = columns.begin();
197  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
198  continue;
199  }
200  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
201  auto cd = *cd_it;
202  const auto& col_ti = cd->columnType;
203  bool column_is_present =
204  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
205  CHECK(row.size() > import_idx || !column_is_present);
206  bool is_null = false;
207  try {
208  is_null = column_is_present
209  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
210  : true;
211  } catch (const std::exception& e) {
212  if (request.track_rejected_rows) {
213  result.rejected_rows.insert(current_row_id);
214  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
215  break; // skip rest of row
216  } else {
217  throw;
218  }
219  }
220  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
221  // everything but strings should be always trimmed
222  row[import_idx] = sv_strip(row[import_idx]);
223  }
224  if (col_ti.is_geometry()) {
225  if (!skip_column_import(request, col_idx)) {
226  auto starting_col_idx = col_idx;
227  try {
229  col_idx,
230  request.copy_params,
231  cd_it,
232  row,
233  import_idx,
234  is_null,
235  request.first_row_index,
236  row_index_plus_one,
237  request.getCatalog(),
238  request.render_group_analyzer_map);
239  } catch (const std::exception& e) {
240  if (request.track_rejected_rows) {
241  result.rejected_rows.insert(current_row_id);
242  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
243  break; // skip rest of row
244  } else {
245  throw;
246  }
247  }
248  } else {
249  // update import/col idx according to types
250  if (!is_null && cd->columnType == kPOINT &&
251  isCoordinateScalar(row[import_idx])) {
252  if (!columns_are_pre_filtered) {
253  ++import_idx;
254  }
255  }
256  if (!columns_are_pre_filtered) {
257  ++import_idx;
258  }
259  ++col_idx;
260  col_idx += col_ti.get_physical_cols();
261  }
262  // skip remaining physical columns
263  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
264  ++cd_it;
265  }
266  } else {
267  if (!skip_column_import(request, col_idx)) {
268  try {
269  request.import_buffers[col_idx]->add_value(
270  cd, row[import_idx], is_null, request.copy_params);
271  } catch (const std::exception& e) {
272  if (request.track_rejected_rows) {
273  result.rejected_rows.insert(current_row_id);
274  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
275  break; // skip rest of row
276  } else {
277  throw;
278  }
279  }
280  }
281  if (column_is_present) {
282  ++import_idx;
283  }
284  ++col_idx;
285  }
286  }
287  } catch (const std::exception& e) {
288  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
289  "\" in row \"" + std::string(line_start, p) +
290  "\" in file \"" + file_path + "\"");
291  }
292  }
293  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
294 
295  result.row_offsets = row_offsets;
296  result.row_count = row_count;
297  if (convert_data_blocks) {
298  result.column_id_to_data_blocks_map =
300  }
301  return result;
302 }
308  const std::string& row,
309  const import_export::CopyParams& copy_params,
310  size_t num_cols,
311  int point_cols,
312  const std::string& file_name) const {
313  bool is_array = false;
314  bool try_single_thread = false;
315  std::vector<std::unique_ptr<char[]>> tmp_buffers;
316  std::vector<std::string_view> fields;
317  // parse columns in row into fields (other return values are intentionally ignored)
319  row.c_str() + row.size(),
320  row.c_str() + row.size(),
321  copy_params,
322  &is_array,
323  fields,
324  tmp_buffers,
325  try_single_thread,
326  false // Don't filter empty lines
327  );
328  // Check we have right number of columns
329  validate_expected_column_count(fields, num_cols, point_cols, file_name);
330 }
331 
333  const ForeignTable* foreign_table) const {
334  import_export::CopyParams copy_params{};
335  copy_params.plain_text = true;
336  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
337  !value.empty()) {
338  copy_params.delimiter = value[0];
339  }
340  if (auto it = foreign_table->options.find(NULLS_KEY);
341  it != foreign_table->options.end()) {
342  copy_params.null_str = it->second;
343  }
344  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
345  if (has_header.has_value()) {
346  if (has_header.value()) {
347  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
348  } else {
349  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
350  }
351  }
352  copy_params.quoted =
353  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
354  if (const auto& value =
356  !value.empty()) {
357  copy_params.quote = value[0];
358  }
359  if (const auto& value =
361  !value.empty()) {
362  copy_params.escape = value[0];
363  }
364  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
365  !value.empty()) {
366  copy_params.line_delim = value[0];
367  }
368  if (const auto& value =
370  !value.empty()) {
371  copy_params.array_delim = value[0];
372  }
373  if (const auto& value =
375  !value.empty()) {
376  copy_params.array_begin = value[0];
377  copy_params.array_end = value[1];
378  }
379  copy_params.lonlat =
380  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
381  copy_params.geo_assign_render_groups =
383  .value_or(copy_params.geo_assign_render_groups);
384  copy_params.geo_explode_collections =
386  .value_or(copy_params.geo_explode_collections);
387  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
388  it != foreign_table->options.end()) {
389  copy_params.source_srid = std::stoi(it->second);
390  }
391 
392  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
393  it != foreign_table->options.end()) {
394  copy_params.buffer_size = std::stoi(it->second);
395  }
396  if (auto it = foreign_table->options.find(THREADS_KEY);
397  it != foreign_table->options.end()) {
398  copy_params.threads = std::stoi(it->second);
399  }
400  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
401  .value_or(copy_params.trim_spaces);
402 
403  return copy_params;
404 }
405 
407  size_t& alloc_size,
408  std::unique_ptr<char[]>& buffer,
409  size_t& buffer_size,
410  const import_export::CopyParams& copy_params,
411  const size_t buffer_first_row_index,
412  unsigned int& num_rows_in_buffer,
413  foreign_storage::FileReader* file_reader) const {
415  buffer,
416  buffer_size,
417  copy_params,
418  buffer_first_row_index,
419  num_rows_in_buffer,
420  nullptr,
421  file_reader);
422 }
423 } // namespace foreign_storage
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
std::string_view sv_strip(std::string_view str)
return trimmed string_view
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
const import_export::CopyParams copy_params
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:222
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY