OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StreamImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include <boost/algorithm/string.hpp>
25 #include <boost/algorithm/string/trim.hpp>
26 #include <cstring>
27 #include <iostream>
28 #include <iterator>
29 #include <string>
30 
31 #include "Logger/Logger.h"
32 #include "RowToColumnLoader.h"
33 #include "Shared/ThriftClient.h"
35 #include "Shared/sqltypes.h"
36 
37 #include <chrono>
38 #include <thread>
39 
40 #include <boost/program_options.hpp>
41 
42 #define MAX_FIELD_LEN 20000
43 
44 bool print_error_data = false;
45 bool print_transformation = false;
46 
47 // reads copy_params.delimiter delimited rows from std::cin and load them to
48 // table_name in batches of size copy_params.batch_size until EOF
50  RowToColumnLoader& row_loader,
51  const std::map<std::string,
52  std::pair<std::unique_ptr<boost::regex>,
53  std::unique_ptr<std::string>>>& transformations,
54  const import_export::CopyParams& copy_params,
55  const bool remove_quotes) {
56  std::ios_base::sync_with_stdio(false);
57  std::istream_iterator<char> eos;
58  std::cin >> std::noskipws;
59  std::istream_iterator<char> iit(std::cin);
60 
61  char field[MAX_FIELD_LEN];
62  size_t field_i = 0;
63 
64  int nrows = 0;
65  int nskipped = 0;
66  bool backEscape = false;
67 
68  auto row_desc = row_loader.get_row_descriptor();
69 
70  std::vector<
71  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
72  xforms(row_desc.size());
73  for (size_t i = 0; i < row_desc.size(); i++) {
74  auto it = transformations.find(row_desc[i].col_name);
75  if (it != transformations.end()) {
76  xforms[i] = &(it->second);
77  } else {
78  xforms[i] = nullptr;
79  }
80  }
81 
82  std::vector<TStringValue> row; // used to store each row as we move through the stream
83 
84  int read_rows = 0;
85  while (iit != eos) {
86  // construct a row
87  while (iit != eos) {
88  if (*iit == copy_params.delimiter || *iit == copy_params.line_delim) {
89  bool end_of_field = (*iit == copy_params.delimiter);
90  bool end_of_row;
91  if (end_of_field) {
92  end_of_row = false;
93  } else {
94  end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
95  (row.size() == row_desc.size() - 1);
96  if (!end_of_row) {
97  size_t l = copy_params.null_str.size();
98  if (field_i >= l &&
99  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
100  end_of_row = true;
101  }
102  }
103  }
104  if (!end_of_field && !end_of_row) {
105  // not enough columns yet and it is a string column
106  // treat the line delimiter as part of the string
107  field[field_i++] = *iit;
108  } else {
109  field[field_i] = '\0';
110  field_i = 0;
111  TStringValue ts;
112  ts.str_val = std::string(field);
113  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
114  auto xform = row.size() < row_desc.size() ? xforms[row.size()] : nullptr;
115  if (!ts.is_null && xform != nullptr) {
116  if (print_transformation) {
117  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
118  }
119  ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
120  if (ts.str_val.empty()) {
121  ts.is_null = true;
122  }
123  if (print_transformation) {
124  std::cout << ts.str_val << std::endl;
125  }
126  }
127 
128  row.push_back(ts); // add column value to row
129  if (end_of_row || (row.size() > row_desc.size())) {
130  break; // found row
131  }
132  }
133  } else {
134  if (*iit == '\\') {
135  backEscape = true;
136  } else if (backEscape || !remove_quotes || *iit != '\"') {
137  field[field_i++] = *iit;
138  backEscape = false;
139  }
140  // else if unescaped double-quote, continue without adding the
141  // character to the field string.
142  }
143  if (field_i >= MAX_FIELD_LEN) {
144  field[MAX_FIELD_LEN - 1] = '\0';
145  std::cerr << "String too long for buffer." << std::endl;
146  if (print_error_data) {
147  std::cerr << field << std::endl;
148  }
149  field_i = 0;
150  break;
151  }
152  ++iit;
153  }
154  if (row.size() == row_desc.size()) {
155  // add the new data in the column format
156  bool record_loaded = row_loader.convert_string_to_column(row, copy_params);
157  read_rows++;
158  if (!record_loaded) {
159  // record could not be parsed correctly conside rit skipped
160  ++nskipped;
161  }
162  row.clear();
163  if (read_rows % copy_params.batch_size == 0) {
164  row_loader.do_load(nrows, nskipped, copy_params);
165  }
166  } else {
167  ++nskipped;
168  if (print_error_data) {
169  std::cerr << "Incorrect number of columns for row: ";
170  std::cerr << row_loader.print_row_with_delim(row, copy_params) << std::endl;
171  }
172  if (row.size() > row_desc.size()) {
173  // skip to the next line delimiter
174  while (*iit != copy_params.line_delim) {
175  ++iit;
176  }
177  }
178  row.clear();
179  }
180  ++iit;
181  }
182  // load remaining rows if any
183  if (read_rows % copy_params.batch_size != 0) {
184  LOG(INFO) << " read_rows " << read_rows;
185  row_loader.do_load(nrows, nskipped, copy_params);
186  }
187 }
188 
189 int main(int argc, char** argv) {
190  std::string server_host("localhost"); // default to localhost
191  int port = 6274; // default port number
192  bool http = false;
193  bool https = false;
194  bool skip_host_verify = false;
195  std::string ca_cert_name{""};
196  std::string table_name;
197  std::string db_name;
198  std::string user_name;
199  std::string passwd;
200  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
201  size_t batch_size = 10000;
202  size_t retry_count = 10;
203  size_t retry_wait = 5;
204  bool remove_quotes = false;
205  std::vector<std::string> xforms;
206  std::map<std::string,
207  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
208  transformations;
209  ThriftConnectionType conn_type;
210 
211  namespace po = boost::program_options;
212 
213  po::options_description desc("Options");
214  desc.add_options()("help,h", "Print help messages ");
215  desc.add_options()(
216  "table", po::value<std::string>(&table_name)->required(), "Table Name");
217  desc.add_options()(
218  "database", po::value<std::string>(&db_name)->required(), "Database Name");
219  desc.add_options()(
220  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
221  desc.add_options()(
222  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
223  desc.add_options()("host",
224  po::value<std::string>(&server_host)->default_value(server_host),
225  "HeavyDB Server Hostname");
226  desc.add_options()(
227  "port", po::value<int>(&port)->default_value(port), "HeavyDB Server Port Number");
228  desc.add_options()("http",
229  po::bool_switch(&http)->default_value(http)->implicit_value(true),
230  "Use HTTP transport");
231  desc.add_options()("https",
232  po::bool_switch(&https)->default_value(https)->implicit_value(true),
233  "Use HTTPS transport");
234  desc.add_options()("skip-verify",
235  po::bool_switch(&skip_host_verify)
236  ->default_value(skip_host_verify)
237  ->implicit_value(true),
238  "Don't verify SSL certificate validity");
239  desc.add_options()(
240  "ca-cert",
241  po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
242  "Path to trusted server certificate. Initiates an encrypted connection");
243  desc.add_options()("delim",
244  po::value<std::string>(&delim_str)->default_value(delim_str),
245  "Field delimiter");
246  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
247  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
248  desc.add_options()(
249  "quoted",
250  po::value<std::string>(&quoted),
251  "Whether the source contains quoted fields (true/false, default false)");
252  desc.add_options()("batch",
253  po::value<size_t>(&batch_size)->default_value(batch_size),
254  "Insert batch size");
255  desc.add_options()("retry_count",
256  po::value<size_t>(&retry_count)->default_value(retry_count),
257  "Number of time to retry an insert");
258  desc.add_options()("retry_wait",
259  po::value<size_t>(&retry_wait)->default_value(retry_wait),
260  "wait in secs between retries");
261  desc.add_options()("transform,t",
262  po::value<std::vector<std::string>>(&xforms)->multitoken(),
263  "Column Transformations");
264  desc.add_options()("print_error", "Print Error Rows");
265  desc.add_options()("print_transform", "Print Transformations");
266 
267  po::positional_options_description positionalOptions;
268  positionalOptions.add("table", 1);
269  positionalOptions.add("database", 1);
270 
271  logger::LogOptions log_options(argv[0]);
272  log_options.max_files_ = 0; // stderr only
273  desc.add(log_options.get_options());
274 
275  po::variables_map vm;
276 
277  try {
278  po::store(po::command_line_parser(argc, argv)
279  .options(desc)
280  .positional(positionalOptions)
281  .run(),
282  vm);
283  if (vm.count("help")) {
284  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
285  "<password> [{--host} "
286  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
287  "string>][--line <line "
288  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
289  "[--quoted <true|false>] "
290  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
291  "secs>][--print_error][--print_transform]\n\n";
292  std::cout << desc << std::endl;
293  return 0;
294  }
295  if (vm.count("print_error")) {
296  print_error_data = true;
297  }
298  if (vm.count("print_transform")) {
299  print_transformation = true;
300  }
301 
302  po::notify(vm);
303  } catch (boost::program_options::error& e) {
304  std::cerr << "Usage Error: " << e.what() << std::endl;
305  return 1;
306  }
307 
308  logger::init(log_options);
309 
310  if (http) {
311  conn_type = ThriftConnectionType::HTTP;
312  } else if (https) {
313  conn_type = ThriftConnectionType::HTTPS;
314  } else if (!ca_cert_name.empty()) {
316  } else {
317  conn_type = ThriftConnectionType::BINARY;
318  }
319 
320  char delim = delim_str[0];
321  if (delim == '\\') {
322  if (delim_str.size() < 2 ||
323  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
324  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
325  return 1;
326  }
327  if (delim_str[1] == 't') {
328  delim = '\t';
329  } else if (delim_str[1] == 'n') {
330  delim = '\n';
331  } else {
332  std::string d(delim_str);
333  d[0] = '0';
334  delim = (char)std::stoi(d, nullptr, 16);
335  }
336  }
337  if (isprint(delim)) {
338  std::cout << "Field Delimiter: " << delim << std::endl;
339  } else if (delim == '\t') {
340  std::cout << "Field Delimiter: "
341  << "\\t" << std::endl;
342  } else if (delim == '\n') {
343  std::cout << "Field Delimiter: "
344  << "\\n"
345  << std::endl;
346  } else {
347  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
348  }
349  char line_delim = line_delim_str[0];
350  if (line_delim == '\\') {
351  if (line_delim_str.size() < 2 ||
352  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
353  line_delim_str[1] != 'n')) {
354  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
355  return 1;
356  }
357  if (line_delim_str[1] == 't') {
358  line_delim = '\t';
359  } else if (line_delim_str[1] == 'n') {
360  line_delim = '\n';
361  } else {
362  std::string d(line_delim_str);
363  d[0] = '0';
364  line_delim = (char)std::stoi(d, nullptr, 16);
365  }
366  }
367  if (isprint(line_delim)) {
368  std::cout << "Line Delimiter: " << line_delim << std::endl;
369  } else if (line_delim == '\t') {
370  std::cout << "Line Delimiter: "
371  << "\\t" << std::endl;
372  } else if (line_delim == '\n') {
373  std::cout << "Line Delimiter: "
374  << "\\n"
375  << std::endl;
376  } else {
377  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
378  }
379  std::cout << "Null String: " << nulls << std::endl;
380  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
381 
382  if (quoted == "true") {
383  remove_quotes = true;
384  }
385 
386  for (auto& t : xforms) {
387  auto n = t.find_first_of(':');
388  if (n == std::string::npos) {
389  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
390  << std::endl;
391  return 1;
392  }
393  std::string col_name = t.substr(0, n);
394  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
395  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
396  << std::endl;
397  return 1;
398  }
399  auto n1 = n + 3;
400  auto n2 = t.find_first_of('/', n1);
401  if (n2 == std::string::npos) {
402  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
403  << std::endl;
404  return 1;
405  }
406  std::string regex_str = t.substr(n1, n2 - n1);
407  n1 = n2 + 1;
408  n2 = t.find_first_of('/', n1);
409  if (n2 == std::string::npos) {
410  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
411  << std::endl;
412  return 1;
413  }
414  std::string fmt_str = t.substr(n1, n2 - n1);
415  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
416  << std::endl;
417  transformations[col_name] =
418  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
419  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
420  std::unique_ptr<std::string>(new std::string(fmt_str)));
421  }
422 
423  import_export::CopyParams copy_params(
424  delim, nulls, line_delim, batch_size, retry_count, retry_wait);
425  RowToColumnLoader row_loader(
427  server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
428  user_name,
429  passwd,
430  db_name,
431  table_name);
432 
433  stream_insert(row_loader, transformations, copy_params, remove_quotes);
434  return 0;
435 }
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
ThriftConnectionType
Definition: ThriftClient.h:29
#define LOG(tag)
Definition: Logger.h:285
Constants for Builtin SQL Types supported by HEAVY.AI.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
bool print_error_data
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
#define MAX_FIELD_LEN
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams &copy_params, const bool remove_quotes)
boost::program_options::options_description const & get_options() const
size_t max_files_
Definition: Logger.h:218
static bool run
constexpr double n
Definition: Utm.h:38