OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
21 #include "ImportExport/Importer.h"
22 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
27 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
28  return request.import_buffers[column_idx] == nullptr;
29 }
30 
32  std::unique_ptr<bool[]>& array_flags,
33  int& phys_cols,
34  int& point_cols,
35  const std::list<const ColumnDescriptor*>& columns) {
36  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
37  size_t i = 0;
38  for (const auto cd : columns) {
39  const auto& col_ti = cd->columnType;
40  phys_cols += col_ti.get_physical_cols();
41  if (cd->columnType.get_type() == kPOINT) {
42  point_cols++;
43  }
44 
45  if (cd->columnType.get_type() == kARRAY) {
46  array_flags.get()[i] = true;
47  } else {
48  array_flags.get()[i] = false;
49  }
50  i++;
51  }
52 }
53 
54 void validate_expected_column_count(std::vector<std::string_view>& row,
55  size_t num_cols,
56  int point_cols,
57  const std::string& file_name) {
58  // Each POINT could consume two separate coords instead of a single WKT
59  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
60  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
61  }
62 }
63 
64 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
65  const std::string& option_name) {
66  if (auto it = foreign_table->options.find(option_name);
67  it != foreign_table->options.end()) {
68  if (it->second.length() == 1) {
69  return it->second;
70  } else {
71  if (it->second == std::string("\\n")) {
72  return "\n";
73  } else if (it->second == std::string("\\t")) {
74  return "\t";
75  } else {
76  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
77  "\". Expected a single character, \"\\n\" or \"\\t\"."};
78  }
79  }
80  }
81  return "";
82 }
83 
84 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
85  const std::string& option_name,
86  const size_t expected_num_chars) {
87  if (auto it = foreign_table->options.find(option_name);
88  it != foreign_table->options.end()) {
89  if (it->second.length() != expected_num_chars) {
90  throw std::runtime_error{"Value of \"" + option_name +
91  "\" foreign table option has the wrong number of "
92  "characters. Expected " +
93  std::to_string(expected_num_chars) + " character(s)."};
94  }
95  return it->second;
96  }
97  return "";
98 }
99 
100 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
101  const std::string& option_name) {
102  if (auto it = foreign_table->options.find(option_name);
103  it != foreign_table->options.end()) {
104  if (boost::iequals(it->second, "TRUE")) {
105  return true;
106  } else if (boost::iequals(it->second, "FALSE")) {
107  return false;
108  } else {
109  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
110  "\" foreign table option. "
111  "Value must be either 'true' or 'false'."};
112  }
113  }
114  return std::nullopt;
115 }
116 } // namespace
117 
123  bool convert_data_blocks,
124  bool columns_are_pre_filtered) const {
125  CHECK(request.buffer);
127  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
128  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
129  const char* thread_buf_end = request.buffer.get() + request.end_pos;
130  const char* buf_end = request.buffer.get() + request.buffer_size;
131 
132  std::vector<std::string_view> row;
133  size_t row_index_plus_one = 0;
134  const char* p = thread_buf;
135  bool try_single_thread = false;
136  int phys_cols = 0;
137  int point_cols = 0;
138  std::unique_ptr<bool[]> array_flags;
139 
141  array_flags,
142  phys_cols,
143  point_cols,
144  request.foreign_table_schema->getLogicalColumns());
145  auto num_cols = request.getColumns().size() - phys_cols;
146  if (columns_are_pre_filtered) {
147  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
148  if (skip_column_import(request, col_idx)) {
149  --num_cols;
150  }
151  }
152  }
153 
154  size_t current_row_id = 0;
155  size_t row_count = 0;
156  size_t remaining_row_count = request.process_row_count;
157  std::vector<size_t> row_offsets{};
158  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
159 
161 
162  std::string file_path = request.getFilePath();
163  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
164  row.clear();
165  current_row_id = row_count;
166  row_count++;
167  std::vector<std::unique_ptr<char[]>>
168  tmp_buffers; // holds string w/ removed escape chars, etc
169  const char* line_start = p;
171  thread_buf_end,
172  buf_end,
173  request.copy_params,
174  array_flags.get(),
175  row,
176  tmp_buffers,
177  try_single_thread,
178  !columns_are_pre_filtered);
179 
180  row_index_plus_one++;
181 
182  bool incorrect_column_count = false;
183  try {
184  validate_expected_column_count(row, num_cols, point_cols, file_path);
185  } catch (const ForeignStorageException& e) {
186  if (request.track_rejected_rows) {
187  result.rejected_rows.insert(current_row_id);
188  incorrect_column_count = true;
189  } else {
190  throw;
191  }
192  }
193 
194  size_t import_idx = 0;
195  size_t col_idx = 0;
196 
197  try {
198  auto columns = request.getColumns();
199  if (incorrect_column_count) {
200  auto cd_it = columns.begin();
201  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
202  continue;
203  }
204  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
205  auto cd = *cd_it;
206  const auto& col_ti = cd->columnType;
207  bool column_is_present =
208  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
209  CHECK(row.size() > import_idx || !column_is_present);
210  bool is_null = false;
211  try {
212  is_null = column_is_present
213  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
214  : true;
215  } catch (const std::exception& e) {
216  if (request.track_rejected_rows) {
217  result.rejected_rows.insert(current_row_id);
218  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
219  break; // skip rest of row
220  } else {
221  throw;
222  }
223  }
224  if (!col_ti.is_string() && !request.copy_params.trim_spaces) {
225  // everything but strings should be always trimmed
226  row[import_idx] = sv_strip(row[import_idx]);
227  }
228  if (col_ti.is_geometry()) {
229  if (!skip_column_import(request, col_idx)) {
230  auto starting_col_idx = col_idx;
231  try {
233  col_idx,
234  request.copy_params,
235  cd_it,
236  row,
237  import_idx,
238  is_null,
239  request.first_row_index,
240  row_index_plus_one,
241  request.getCatalog(),
242  request.render_group_analyzer_map);
243  } catch (const std::exception& e) {
244  if (request.track_rejected_rows) {
245  result.rejected_rows.insert(current_row_id);
246  fillRejectedRowWithInvalidData(columns, cd_it, starting_col_idx, request);
247  break; // skip rest of row
248  } else {
249  throw;
250  }
251  }
252  } else {
253  // update import/col idx according to types
254  if (!is_null && cd->columnType == kPOINT &&
255  isCoordinateScalar(row[import_idx])) {
256  if (!columns_are_pre_filtered) {
257  ++import_idx;
258  }
259  }
260  if (!columns_are_pre_filtered) {
261  ++import_idx;
262  }
263  ++col_idx;
264  col_idx += col_ti.get_physical_cols();
265  }
266  // skip remaining physical columns
267  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
268  ++cd_it;
269  }
270  } else {
271  if (!skip_column_import(request, col_idx)) {
272  try {
273  request.import_buffers[col_idx]->add_value(
274  cd, row[import_idx], is_null, request.copy_params);
275  } catch (const std::exception& e) {
276  if (request.track_rejected_rows) {
277  result.rejected_rows.insert(current_row_id);
278  fillRejectedRowWithInvalidData(columns, cd_it, col_idx, request);
279  break; // skip rest of row
280  } else {
281  throw;
282  }
283  }
284  }
285  if (column_is_present) {
286  ++import_idx;
287  }
288  ++col_idx;
289  }
290  }
291  } catch (const std::exception& e) {
292  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
293  "\" in row \"" + std::string(line_start, p) +
294  "\" in file \"" + file_path + "\"");
295  }
296  }
297  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
298 
299  result.row_offsets = row_offsets;
300  result.row_count = row_count;
301  if (convert_data_blocks) {
302  result.column_id_to_data_blocks_map =
304  }
305  return result;
306 }
312  const std::string& row,
313  const import_export::CopyParams& copy_params,
314  size_t num_cols,
315  int point_cols,
316  const std::string& file_name) const {
317  bool is_array = false;
318  bool try_single_thread = false;
319  std::vector<std::unique_ptr<char[]>> tmp_buffers;
320  std::vector<std::string_view> fields;
321  // parse columns in row into fields (other return values are intentionally ignored)
323  row.c_str() + row.size(),
324  row.c_str() + row.size(),
325  copy_params,
326  &is_array,
327  fields,
328  tmp_buffers,
329  try_single_thread,
330  false // Don't filter empty lines
331  );
332  // Check we have right number of columns
333  validate_expected_column_count(fields, num_cols, point_cols, file_name);
334 }
335 
337  const ForeignTable* foreign_table) const {
338  import_export::CopyParams copy_params{};
339  copy_params.plain_text = true;
340  if (const auto& value = validate_and_get_delimiter(foreign_table, DELIMITER_KEY);
341  !value.empty()) {
342  copy_params.delimiter = value[0];
343  }
344  if (auto it = foreign_table->options.find(NULLS_KEY);
345  it != foreign_table->options.end()) {
346  copy_params.null_str = it->second;
347  }
348  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
349  if (has_header.has_value()) {
350  if (has_header.value()) {
351  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
352  } else {
353  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
354  }
355  }
356  copy_params.quoted =
357  validate_and_get_bool_value(foreign_table, QUOTED_KEY).value_or(copy_params.quoted);
358  if (const auto& value =
360  !value.empty()) {
361  copy_params.quote = value[0];
362  }
363  if (const auto& value =
365  !value.empty()) {
366  copy_params.escape = value[0];
367  }
368  if (const auto& value = validate_and_get_delimiter(foreign_table, LINE_DELIMITER_KEY);
369  !value.empty()) {
370  copy_params.line_delim = value[0];
371  }
372  if (const auto& value =
374  !value.empty()) {
375  copy_params.array_delim = value[0];
376  }
377  if (const auto& value =
379  !value.empty()) {
380  copy_params.array_begin = value[0];
381  copy_params.array_end = value[1];
382  }
383  copy_params.lonlat =
384  validate_and_get_bool_value(foreign_table, LONLAT_KEY).value_or(copy_params.lonlat);
385  copy_params.geo_assign_render_groups =
387  .value_or(copy_params.geo_assign_render_groups);
388  copy_params.geo_explode_collections =
390  .value_or(copy_params.geo_explode_collections);
391  if (auto it = foreign_table->options.find(SOURCE_SRID_KEY);
392  it != foreign_table->options.end()) {
393  copy_params.source_srid = std::stoi(it->second);
394  }
395 
396  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
397  it != foreign_table->options.end()) {
398  copy_params.buffer_size = std::stoi(it->second);
399  }
400  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
401  it != foreign_table->options.end()) {
402  copy_params.threads = std::stoi(it->second);
403  }
404  copy_params.trim_spaces = validate_and_get_bool_value(foreign_table, TRIM_SPACES_KEY)
405  .value_or(copy_params.trim_spaces);
406 
407  return copy_params;
408 }
409 
411  size_t& alloc_size,
412  std::unique_ptr<char[]>& buffer,
413  size_t& buffer_size,
414  const import_export::CopyParams& copy_params,
415  const size_t buffer_first_row_index,
416  unsigned int& num_rows_in_buffer,
417  foreign_storage::FileReader* file_reader) const {
419  buffer,
420  buffer_size,
421  copy_params,
422  buffer_first_row_index,
423  num_rows_in_buffer,
424  nullptr,
425  file_reader);
426 }
427 } // namespace foreign_storage
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
std::string_view sv_strip(std::string_view str)
return trimmed string_view
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
const import_export::CopyParams copy_params
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:222
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
const RenderGroupAnalyzerMap * render_group_analyzer_map
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY