OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StreamImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include <boost/algorithm/string.hpp>
27 #include <boost/algorithm/string/trim.hpp>
28 #include <boost/regex.hpp>
29 #include <cstring>
30 #include <iostream>
31 #include <iterator>
32 #include <string>
33 
34 #include "RowToColumnLoader.h"
35 #include "Shared/Logger.h"
36 #include "Shared/ThriftClient.h"
37 #include "Shared/sqltypes.h"
38 
39 #include <chrono>
40 #include <thread>
41 
42 #include <boost/program_options.hpp>
43 
44 #define MAX_FIELD_LEN 20000
45 
46 bool print_error_data = false;
47 bool print_transformation = false;
48 
49 // reads copy_params.delimiter delimited rows from std::cin and load them to
50 // table_name in batches of size copy_params.batch_size until EOF
52  RowToColumnLoader& row_loader,
53  const std::map<std::string,
54  std::pair<std::unique_ptr<boost::regex>,
55  std::unique_ptr<std::string>>>& transformations,
56  const Importer_NS::CopyParams& copy_params,
57  const bool remove_quotes) {
58  std::ios_base::sync_with_stdio(false);
59  std::istream_iterator<char> eos;
60  std::cin >> std::noskipws;
61  std::istream_iterator<char> iit(std::cin);
62 
63  char field[MAX_FIELD_LEN];
64  size_t field_i = 0;
65 
66  int nrows = 0;
67  int nskipped = 0;
68  bool backEscape = false;
69 
70  auto row_desc = row_loader.get_row_descriptor();
71 
72  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*
73  xforms[row_desc.size()];
74  for (size_t i = 0; i < row_desc.size(); i++) {
75  auto it = transformations.find(row_desc[i].col_name);
76  if (it != transformations.end()) {
77  xforms[i] = &(it->second);
78  } else {
79  xforms[i] = nullptr;
80  }
81  }
82 
83  std::vector<TStringValue> row; // used to store each row as we move through the stream
84 
85  int read_rows = 0;
86  while (iit != eos) {
87  // construct a row
88  while (iit != eos) {
89  if (*iit == copy_params.delimiter || *iit == copy_params.line_delim) {
90  bool end_of_field = (*iit == copy_params.delimiter);
91  bool end_of_row;
92  if (end_of_field) {
93  end_of_row = false;
94  } else {
95  end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
96  (row.size() == row_desc.size() - 1);
97  if (!end_of_row) {
98  size_t l = copy_params.null_str.size();
99  if (field_i >= l &&
100  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
101  end_of_row = true;
102  }
103  }
104  }
105  if (!end_of_field && !end_of_row) {
106  // not enough columns yet and it is a string column
107  // treat the line delimiter as part of the string
108  field[field_i++] = *iit;
109  } else {
110  field[field_i] = '\0';
111  field_i = 0;
112  TStringValue ts;
113  ts.str_val = std::string(field);
114  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
115  auto xform = row.size() < row_desc.size() ? xforms[row.size()] : nullptr;
116  if (!ts.is_null && xform != nullptr) {
117  if (print_transformation) {
118  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
119  }
120  ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
121  if (ts.str_val.empty()) {
122  ts.is_null = true;
123  }
124  if (print_transformation) {
125  std::cout << ts.str_val << std::endl;
126  }
127  }
128 
129  row.push_back(ts); // add column value to row
130  if (end_of_row || (row.size() > row_desc.size())) {
131  break; // found row
132  }
133  }
134  } else {
135  if (*iit == '\\') {
136  backEscape = true;
137  } else if (backEscape || !remove_quotes || *iit != '\"') {
138  field[field_i++] = *iit;
139  backEscape = false;
140  }
141  // else if unescaped double-quote, continue without adding the
142  // character to the field string.
143  }
144  if (field_i >= MAX_FIELD_LEN) {
145  field[MAX_FIELD_LEN - 1] = '\0';
146  std::cerr << "String too long for buffer." << std::endl;
147  if (print_error_data) {
148  std::cerr << field << std::endl;
149  }
150  field_i = 0;
151  break;
152  }
153  ++iit;
154  }
155  if (row.size() == row_desc.size()) {
156  // add the new data in the column format
157  bool record_loaded = row_loader.convert_string_to_column(row, copy_params);
158  read_rows++;
159  if (!record_loaded) {
160  // record could not be parsed correctly conside rit skipped
161  ++nskipped;
162  }
163  row.clear();
164  if (read_rows % copy_params.batch_size == 0) {
165  row_loader.do_load(nrows, nskipped, copy_params);
166  }
167  } else {
168  ++nskipped;
169  if (print_error_data) {
170  std::cerr << "Incorrect number of columns for row: ";
171  std::cerr << row_loader.print_row_with_delim(row, copy_params) << std::endl;
172  }
173  if (row.size() > row_desc.size()) {
174  // skip to the next line delimiter
175  while (*iit != copy_params.line_delim) {
176  ++iit;
177  }
178  }
179  row.clear();
180  }
181  ++iit;
182  }
183  // load remaining rows if any
184  if (read_rows % copy_params.batch_size != 0) {
185  LOG(INFO) << " read_rows " << read_rows;
186  row_loader.do_load(nrows, nskipped, copy_params);
187  }
188 }
189 
190 int main(int argc, char** argv) {
191  std::string server_host("localhost"); // default to localhost
192  int port = 6274; // default port number
193  bool http = false;
194  bool https = false;
195  bool skip_host_verify = false;
196  std::string ca_cert_name{""};
197  std::string table_name;
198  std::string db_name;
199  std::string user_name;
200  std::string passwd;
201  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
202  size_t batch_size = 10000;
203  size_t retry_count = 10;
204  size_t retry_wait = 5;
205  bool remove_quotes = false;
206  std::vector<std::string> xforms;
207  std::map<std::string,
208  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
209  transformations;
210  ThriftConnectionType conn_type;
211 
212  namespace po = boost::program_options;
213 
214  po::options_description desc("Options");
215  desc.add_options()("help,h", "Print help messages ");
216  desc.add_options()(
217  "table", po::value<std::string>(&table_name)->required(), "Table Name");
218  desc.add_options()(
219  "database", po::value<std::string>(&db_name)->required(), "Database Name");
220  desc.add_options()(
221  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
222  desc.add_options()(
223  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
224  desc.add_options()("host",
225  po::value<std::string>(&server_host)->default_value(server_host),
226  "OmniSci Server Hostname");
227  desc.add_options()(
228  "port", po::value<int>(&port)->default_value(port), "OmniSci Server Port Number");
229  desc.add_options()("http",
230  po::bool_switch(&http)->default_value(http)->implicit_value(true),
231  "Use HTTP transport");
232  desc.add_options()("https",
233  po::bool_switch(&https)->default_value(https)->implicit_value(true),
234  "Use HTTPS transport");
235  desc.add_options()("skip-verify",
236  po::bool_switch(&skip_host_verify)
237  ->default_value(skip_host_verify)
238  ->implicit_value(true),
239  "Don't verify SSL certificate validity");
240  desc.add_options()(
241  "ca-cert",
242  po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
243  "Path to trusted server certificate. Initiates an encrypted connection");
244  desc.add_options()("delim",
245  po::value<std::string>(&delim_str)->default_value(delim_str),
246  "Field delimiter");
247  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
248  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
249  desc.add_options()(
250  "quoted",
251  po::value<std::string>(&quoted),
252  "Whether the source contains quoted fields (true/false, default false)");
253  desc.add_options()("batch",
254  po::value<size_t>(&batch_size)->default_value(batch_size),
255  "Insert batch size");
256  desc.add_options()("retry_count",
257  po::value<size_t>(&retry_count)->default_value(retry_count),
258  "Number of time to retry an insert");
259  desc.add_options()("retry_wait",
260  po::value<size_t>(&retry_wait)->default_value(retry_wait),
261  "wait in secs between retries");
262  desc.add_options()("transform,t",
263  po::value<std::vector<std::string>>(&xforms)->multitoken(),
264  "Column Transformations");
265  desc.add_options()("print_error", "Print Error Rows");
266  desc.add_options()("print_transform", "Print Transformations");
267 
268  po::positional_options_description positionalOptions;
269  positionalOptions.add("table", 1);
270  positionalOptions.add("database", 1);
271 
272  logger::LogOptions log_options(argv[0]);
273  log_options.max_files_ = 0; // stderr only
274  desc.add(log_options.get_options());
275 
276  po::variables_map vm;
277 
278  try {
279  po::store(po::command_line_parser(argc, argv)
280  .options(desc)
281  .positional(positionalOptions)
282  .run(),
283  vm);
284  if (vm.count("help")) {
285  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
286  "<password> [{--host} "
287  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
288  "string>][--line <line "
289  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
290  "[--quoted <true|false>] "
291  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
292  "secs>][--print_error][--print_transform]\n\n";
293  std::cout << desc << std::endl;
294  return 0;
295  }
296  if (vm.count("print_error")) {
297  print_error_data = true;
298  }
299  if (vm.count("print_transform")) {
300  print_transformation = true;
301  }
302 
303  po::notify(vm);
304  } catch (boost::program_options::error& e) {
305  std::cerr << "Usage Error: " << e.what() << std::endl;
306  return 1;
307  }
308 
309  logger::init(log_options);
310 
311  if (http) {
312  conn_type = ThriftConnectionType::HTTP;
313  } else if (https) {
314  conn_type = ThriftConnectionType::HTTPS;
315  } else if (!ca_cert_name.empty()) {
317  } else {
318  conn_type = ThriftConnectionType::BINARY;
319  }
320 
321  char delim = delim_str[0];
322  if (delim == '\\') {
323  if (delim_str.size() < 2 ||
324  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
325  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
326  return 1;
327  }
328  if (delim_str[1] == 't') {
329  delim = '\t';
330  } else if (delim_str[1] == 'n') {
331  delim = '\n';
332  } else {
333  std::string d(delim_str);
334  d[0] = '0';
335  delim = (char)std::stoi(d, nullptr, 16);
336  }
337  }
338  if (isprint(delim)) {
339  std::cout << "Field Delimiter: " << delim << std::endl;
340  } else if (delim == '\t') {
341  std::cout << "Field Delimiter: "
342  << "\\t" << std::endl;
343  } else if (delim == '\n') {
344  std::cout << "Field Delimiter: "
345  << "\\n"
346  << std::endl;
347  } else {
348  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
349  }
350  char line_delim = line_delim_str[0];
351  if (line_delim == '\\') {
352  if (line_delim_str.size() < 2 ||
353  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
354  line_delim_str[1] != 'n')) {
355  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
356  return 1;
357  }
358  if (line_delim_str[1] == 't') {
359  line_delim = '\t';
360  } else if (line_delim_str[1] == 'n') {
361  line_delim = '\n';
362  } else {
363  std::string d(line_delim_str);
364  d[0] = '0';
365  line_delim = (char)std::stoi(d, nullptr, 16);
366  }
367  }
368  if (isprint(line_delim)) {
369  std::cout << "Line Delimiter: " << line_delim << std::endl;
370  } else if (line_delim == '\t') {
371  std::cout << "Line Delimiter: "
372  << "\\t" << std::endl;
373  } else if (line_delim == '\n') {
374  std::cout << "Line Delimiter: "
375  << "\\n"
376  << std::endl;
377  } else {
378  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
379  }
380  std::cout << "Null String: " << nulls << std::endl;
381  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
382 
383  if (quoted == "true") {
384  remove_quotes = true;
385  }
386 
387  for (auto& t : xforms) {
388  auto n = t.find_first_of(':');
389  if (n == std::string::npos) {
390  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
391  << std::endl;
392  return 1;
393  }
394  std::string col_name = t.substr(0, n);
395  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
396  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
397  << std::endl;
398  return 1;
399  }
400  auto n1 = n + 3;
401  auto n2 = t.find_first_of('/', n1);
402  if (n2 == std::string::npos) {
403  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
404  << std::endl;
405  return 1;
406  }
407  std::string regex_str = t.substr(n1, n2 - n1);
408  n1 = n2 + 1;
409  n2 = t.find_first_of('/', n1);
410  if (n2 == std::string::npos) {
411  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
412  << std::endl;
413  return 1;
414  }
415  std::string fmt_str = t.substr(n1, n2 - n1);
416  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
417  << std::endl;
418  transformations[col_name] =
419  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
420  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
421  std::unique_ptr<std::string>(new std::string(fmt_str)));
422  }
423 
424  Importer_NS::CopyParams copy_params(
425  delim, nulls, line_delim, batch_size, retry_count, retry_wait);
426  RowToColumnLoader row_loader(
428  server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
429  user_name,
430  passwd,
431  db_name,
432  table_name);
433 
434  stream_insert(row_loader, transformations, copy_params, remove_quotes);
435  return 0;
436 }
std::string null_str
Definition: CopyParams.h:47
TRowDescriptor get_row_descriptor()
ThriftConnectionType
Definition: ThriftClient.h:32
void do_load(int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
#define LOG(tag)
Definition: Logger.h:185
bool convert_string_to_column(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
Constants for Builtin SQL Types supported by MapD.
bool print_transformation
boost::program_options::options_description const & get_options() const
Definition: Logger.cpp:119
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool print_error_data
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
#define MAX_FIELD_LEN
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const Importer_NS::CopyParams &copy_params, const bool remove_quotes)
size_t max_files_
Definition: Logger.h:120
static bool run