OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 #include "ImportExport/Importer.h"
21 #include "Shared/StringTransform.h"
22 
23 namespace foreign_storage {
24 
25 namespace {
26 inline bool skip_column_import(ParseBufferRequest& request, int column_idx) {
27  return request.import_buffers[column_idx] == nullptr;
28 }
29 
31  std::unique_ptr<bool[]>& array_flags,
32  int& phys_cols,
33  int& point_cols,
34  const std::list<const ColumnDescriptor*>& columns) {
35  array_flags = std::unique_ptr<bool[]>(new bool[columns.size()]);
36  size_t i = 0;
37  for (const auto cd : columns) {
38  const auto& col_ti = cd->columnType;
39  phys_cols += col_ti.get_physical_cols();
40  if (cd->columnType.get_type() == kPOINT) {
41  point_cols++;
42  }
43 
44  if (cd->columnType.get_type() == kARRAY) {
45  array_flags.get()[i] = true;
46  } else {
47  array_flags.get()[i] = false;
48  }
49  i++;
50  }
51 }
52 
53 void validate_expected_column_count(std::vector<std::string_view>& row,
54  size_t num_cols,
55  int point_cols,
56  const std::string& file_name) {
57  // Each POINT could consume two separate coords instead of a single WKT
58  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
59  throw_number_of_columns_mismatch_error(num_cols, row.size(), file_name);
60  }
61 }
62 
63 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
64  const std::string& option_name) {
65  if (auto it = foreign_table->options.find(option_name);
66  it != foreign_table->options.end()) {
67  if (it->second.length() == 1) {
68  return it->second;
69  } else {
70  if (it->second == std::string("\\n")) {
71  return "\n";
72  } else if (it->second == std::string("\\t")) {
73  return "\t";
74  } else {
75  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
76  "\". Expected a single character, \"\\n\" or \"\\t\"."};
77  }
78  }
79  }
80  return "";
81 }
82 
83 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
84  const std::string& option_name,
85  const size_t expected_num_chars) {
86  if (auto it = foreign_table->options.find(option_name);
87  it != foreign_table->options.end()) {
88  if (it->second.length() != expected_num_chars) {
89  throw std::runtime_error{"Value of \"" + option_name +
90  "\" foreign table option has the wrong number of "
91  "characters. Expected " +
92  std::to_string(expected_num_chars) + " character(s)."};
93  }
94  return it->second;
95  }
96  return "";
97 }
98 
99 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
100  const std::string& option_name) {
101  if (auto it = foreign_table->options.find(option_name);
102  it != foreign_table->options.end()) {
103  if (boost::iequals(it->second, "TRUE")) {
104  return true;
105  } else if (boost::iequals(it->second, "FALSE")) {
106  return false;
107  } else {
108  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
109  "\" foreign table option. "
110  "Value must be either 'true' or 'false'."};
111  }
112  }
113  return std::nullopt;
114 }
115 } // namespace
116 
122  bool convert_data_blocks,
123  bool columns_are_pre_filtered) const {
124  CHECK(request.buffer);
126  request.buffer.get(), request.begin_pos, request.end_pos, request.copy_params);
127  const char* thread_buf = request.buffer.get() + request.begin_pos + begin;
128  const char* thread_buf_end = request.buffer.get() + request.end_pos;
129  const char* buf_end = request.buffer.get() + request.buffer_size;
130 
131  std::vector<std::string_view> row;
132  size_t row_index_plus_one = 0;
133  const char* p = thread_buf;
134  bool try_single_thread = false;
135  int phys_cols = 0;
136  int point_cols = 0;
137  std::unique_ptr<bool[]> array_flags;
138 
140  array_flags, phys_cols, point_cols, request.getColumns());
141  auto num_cols = request.getColumns().size() - phys_cols;
142  if (columns_are_pre_filtered) {
143  for (size_t col_idx = 0; col_idx < request.getColumns().size(); ++col_idx) {
144  if (skip_column_import(request, col_idx)) {
145  --num_cols;
146  }
147  }
148  }
149 
150  size_t row_count = 0;
151  size_t remaining_row_count = request.process_row_count;
152  std::vector<size_t> row_offsets{};
153  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
154 
155  std::string file_path = request.getFilePath();
156  for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
157  row.clear();
158  row_count++;
159  std::vector<std::unique_ptr<char[]>>
160  tmp_buffers; // holds string w/ removed escape chars, etc
161  const char* line_start = p;
163  thread_buf_end,
164  buf_end,
165  request.copy_params,
166  array_flags.get(),
167  row,
168  tmp_buffers,
169  try_single_thread,
170  !columns_are_pre_filtered);
171 
172  row_index_plus_one++;
173  validate_expected_column_count(row, num_cols, point_cols, file_path);
174 
175  size_t import_idx = 0;
176  size_t col_idx = 0;
177  try {
178  auto columns = request.getColumns();
179  for (auto cd_it = columns.begin(); cd_it != columns.end(); cd_it++) {
180  auto cd = *cd_it;
181  const auto& col_ti = cd->columnType;
182  bool column_is_present =
183  !(skip_column_import(request, col_idx) && columns_are_pre_filtered);
184  CHECK(row.size() > import_idx || !column_is_present);
185  bool is_null =
186  column_is_present
187  ? isNullDatum(row[import_idx], cd, request.copy_params.null_str)
188  : true;
189 
190  if (col_ti.is_geometry()) {
191  if (!skip_column_import(request, col_idx)) {
193  col_idx,
194  request.copy_params,
195  cd_it,
196  row,
197  import_idx,
198  is_null,
199  request.first_row_index,
200  row_index_plus_one,
201  request.getCatalog());
202  } else {
203  // update import/col idx according to types
204  if (!is_null && cd->columnType == kPOINT &&
205  isCoordinateScalar(row[import_idx])) {
206  if (!columns_are_pre_filtered) {
207  ++import_idx;
208  }
209  }
210  if (!columns_are_pre_filtered) {
211  ++import_idx;
212  }
213  ++col_idx;
214  col_idx += col_ti.get_physical_cols();
215  }
216  // skip remaining physical columns
217  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
218  ++cd_it;
219  }
220  } else {
221  if (!skip_column_import(request, col_idx)) {
222  request.import_buffers[col_idx]->add_value(
223  cd, row[import_idx], is_null, request.copy_params);
224  }
225  if (column_is_present) {
226  ++import_idx;
227  }
228  ++col_idx;
229  }
230  }
231  } catch (const std::exception& e) {
232  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
233  "\" in row \"" + std::string(line_start, p) +
234  "\" in file \"" + file_path + "\"");
235  }
236  }
237  row_offsets.emplace_back(request.file_offset + (p - request.buffer.get()));
238 
240  result.row_offsets = row_offsets;
241  result.row_count = row_count;
242  if (convert_data_blocks) {
243  result.column_id_to_data_blocks_map =
245  }
246  return result;
247 }
253  const std::string& row,
254  const import_export::CopyParams& copy_params,
255  size_t num_cols,
256  int point_cols,
257  const std::string& file_name) const {
258  bool is_array = false;
259  bool try_single_thread = false;
260  std::vector<std::unique_ptr<char[]>> tmp_buffers;
261  std::vector<std::string_view> fields;
262  // parse columns in row into fields (other return values are intentionally ignored)
264  row.c_str() + row.size(),
265  row.c_str() + row.size(),
266  copy_params,
267  &is_array,
268  fields,
269  tmp_buffers,
270  try_single_thread,
271  false // Don't filter empty lines
272  );
273  // Check we have right number of columns
274  validate_expected_column_count(fields, num_cols, point_cols, file_name);
275 }
276 
278  const ForeignTable* foreign_table) const {
279  import_export::CopyParams copy_params{};
280  copy_params.plain_text = true;
281  if (const auto& value =
282  validate_and_get_string_with_length(foreign_table, "ARRAY_DELIMITER", 1);
283  !value.empty()) {
284  copy_params.array_delim = value[0];
285  }
286  if (const auto& value =
287  validate_and_get_string_with_length(foreign_table, "ARRAY_MARKER", 2);
288  !value.empty()) {
289  copy_params.array_begin = value[0];
290  copy_params.array_end = value[1];
291  }
292  if (auto it = foreign_table->options.find("BUFFER_SIZE");
293  it != foreign_table->options.end()) {
294  copy_params.buffer_size = std::stoi(it->second);
295  }
296  if (const auto& value = validate_and_get_delimiter(foreign_table, "DELIMITER");
297  !value.empty()) {
298  copy_params.delimiter = value[0];
299  }
300  if (const auto& value = validate_and_get_string_with_length(foreign_table, "ESCAPE", 1);
301  !value.empty()) {
302  copy_params.escape = value[0];
303  }
304  auto has_header = validate_and_get_bool_value(foreign_table, "HEADER");
305  if (has_header.has_value()) {
306  if (has_header.value()) {
307  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
308  } else {
309  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
310  }
311  }
312  if (const auto& value = validate_and_get_delimiter(foreign_table, "LINE_DELIMITER");
313  !value.empty()) {
314  copy_params.line_delim = value[0];
315  }
316  copy_params.lonlat =
317  validate_and_get_bool_value(foreign_table, "LONLAT").value_or(copy_params.lonlat);
318 
319  if (auto it = foreign_table->options.find("NULLS");
320  it != foreign_table->options.end()) {
321  copy_params.null_str = it->second;
322  }
323  if (const auto& value = validate_and_get_string_with_length(foreign_table, "QUOTE", 1);
324  !value.empty()) {
325  copy_params.quote = value[0];
326  }
327  copy_params.quoted =
328  validate_and_get_bool_value(foreign_table, "QUOTED").value_or(copy_params.quoted);
329  return copy_params;
330 }
331 
333  size_t& alloc_size,
334  std::unique_ptr<char[]>& buffer,
335  size_t& buffer_size,
336  const import_export::CopyParams& copy_params,
337  const size_t buffer_first_row_index,
338  unsigned int& num_rows_in_buffer,
339  foreign_storage::FileReader* file_reader) const {
341  buffer,
342  buffer_size,
343  copy_params,
344  buffer_first_row_index,
345  num_rows_in_buffer,
346  nullptr,
347  file_reader);
348 }
349 } // namespace foreign_storage
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const override
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
std::string to_string(char const *&&v)
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
bool skip_column_import(ParseBufferRequest &request, int column_idx)
#define CHECK(condition)
Definition: Logger.h:211
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams &copy_params, size_t num_cols, int point_cols, const std::string &file_name) const
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)