OmniSciDB  04ee39c94c
StreamInsert.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include <boost/regex.hpp>
27 #include <cstring>
28 #include <iostream>
29 #include <iterator>
30 #include <string>
31 
32 #include <chrono>
33 #include <thread>
34 
35 #include <boost/program_options.hpp>
36 
37 // include files for Thrift and MapD Thrift Services
38 #include <thrift/protocol/TBinaryProtocol.h>
39 #include <thrift/transport/TBufferTransports.h>
40 #include <thrift/transport/TSocket.h>
41 #include "gen-cpp/MapD.h"
42 
43 using namespace ::apache::thrift;
44 using namespace ::apache::thrift::protocol;
45 using namespace ::apache::thrift::transport;
46 
47 #ifdef HAVE_THRIFT_STD_SHAREDPTR
48 #include <memory>
49 namespace mapd {
50 using std::make_shared;
51 using std::shared_ptr;
52 } // namespace mapd
53 #else
54 #include <boost/make_shared.hpp>
55 namespace mapd {
56 using boost::make_shared;
57 using boost::shared_ptr;
58 } // namespace mapd
59 #endif // HAVE_THRIFT_STD_SHAREDPTR
60 
61 struct CopyParams {
62  char delimiter;
63  std::string null_str;
64  char line_delim;
65  size_t batch_size;
66  size_t retry_count;
67  size_t retry_wait;
68  CopyParams(char d, const std::string& n, char l, size_t b, size_t retries, size_t wait)
69  : delimiter(d)
70  , null_str(n)
71  , line_delim(l)
72  , batch_size(b)
73  , retry_count(retries)
74  , retry_wait(wait) {}
75 };
76 
78  std::string server_host;
79  int port;
80  std::string db_name;
81  std::string user_name;
82  std::string passwd;
83  ConnectionDetails(std::string in_server_host,
84  int in_port,
85  std::string in_db_name,
86  std::string in_user_name,
87  std::string in_passwd)
88  : server_host(in_server_host)
89  , port(in_port)
90  , db_name(in_db_name)
91  , user_name(in_user_name)
92  , passwd(in_passwd) {}
93 };
94 
95 bool print_error_data = false;
96 bool print_transformation = false;
97 
98 mapd::shared_ptr<MapDClient> client;
99 TSessionId session;
100 mapd::shared_ptr<apache::thrift::transport::TTransport> mytransport;
101 
102 namespace {
103 // anonymous namespace for private functions
104 
105 #define MAX_FIELD_LEN 20000
106 
108  mapd::shared_ptr<TTransport> socket(new TSocket(con.server_host, con.port));
109  mytransport.reset(new TBufferedTransport(socket));
110  mapd::shared_ptr<TProtocol> protocol(new TBinaryProtocol(mytransport));
111  client.reset(new MapDClient(protocol));
112  try {
113  mytransport->open(); // open transport
114  client->connect(
115  session, con.user_name, con.passwd, con.db_name); // connect to omnisci_server
116  } catch (TMapDException& e) {
117  std::cerr << e.error_msg << std::endl;
118  } catch (TException& te) {
119  std::cerr << "Thrift error: " << te.what() << std::endl;
120  }
121 }
122 
124  try {
125  client->disconnect(session); // disconnect from omnisci_server
126  mytransport->close(); // close transport
127  } catch (TMapDException& e) {
128  std::cerr << e.error_msg << std::endl;
129  } catch (TException& te) {
130  std::cerr << "Thrift error: " << te.what() << std::endl;
131  }
132 }
133 
135  CopyParams copy_params,
136  ConnectionDetails conn_details) {
137  std::cout << " Waiting " << copy_params.retry_wait
138  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
139  << " times more " << std::endl;
140  sleep(copy_params.retry_wait);
141 
142  closeConnection();
143  createConnection(conn_details);
144 }
145 
146 void do_load(int& nrows,
147  int& nskipped,
148  std::vector<TStringRow> input_rows,
149  const std::string& table_name,
150  CopyParams copy_params,
151  ConnectionDetails conn_details) {
152  for (size_t tries = 0; tries < copy_params.retry_count;
153  tries++) { // allow for retries in case of insert failure
154  try {
155  client->load_table(session, table_name, input_rows);
156  nrows += input_rows.size();
157  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
158  << std::endl;
159  // we successfully loaded the data, lets move on
160  return;
161  } catch (TMapDException& e) {
162  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
163  wait_disconnet_reconnnect_retry(tries, copy_params, conn_details);
164  } catch (TException& te) {
165  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
166  wait_disconnet_reconnnect_retry(tries, copy_params, conn_details);
167  }
168  }
169  std::cerr << "Retries exhausted program terminated" << std::endl;
170  exit(1);
171 }
172 
173 // reads copy_params.delimiter delimited rows from std::cin and load them to
174 // table_name in batches of size copy_params.batch_size until EOF
176  const std::string& table_name,
177  const TRowDescriptor& row_desc,
178  const std::map<std::string,
179  std::pair<std::unique_ptr<boost::regex>,
180  std::unique_ptr<std::string>>>& transformations,
181  const CopyParams& copy_params,
182  const ConnectionDetails conn_details,
183  const bool remove_quotes) {
184  std::vector<TStringRow> input_rows;
185  TStringRow row;
186 
187  std::ios_base::sync_with_stdio(false);
188  std::istream_iterator<char> eos;
189  std::cin >> std::noskipws;
190  std::istream_iterator<char> iit(std::cin);
191 
192  char field[MAX_FIELD_LEN];
193  size_t field_i = 0;
194 
195  int nrows = 0;
196  int nskipped = 0;
197  bool backEscape = false;
198 
199  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*
200  xforms[row_desc.size()];
201  for (size_t i = 0; i < row_desc.size(); i++) {
202  auto it = transformations.find(row_desc[i].col_name);
203  if (it != transformations.end()) {
204  xforms[i] = &(it->second);
205  } else {
206  xforms[i] = nullptr;
207  }
208  }
209 
210  while (iit != eos) {
211  row.cols.clear();
212  // construct a row
213  while (iit != eos) {
214  if (*iit == copy_params.delimiter || *iit == copy_params.line_delim) {
215  bool end_of_field = (*iit == copy_params.delimiter);
216  bool end_of_row;
217  if (end_of_field) {
218  end_of_row = false;
219  } else {
220  end_of_row = (row_desc[row.cols.size()].col_type.type != TDatumType::STR) ||
221  (row.cols.size() == row_desc.size() - 1);
222  if (!end_of_row) {
223  size_t l = copy_params.null_str.size();
224  if (field_i >= l &&
225  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
226  end_of_row = true;
227  // std::cout << "new line after null.\n";
228  }
229  }
230  }
231  if (!end_of_field && !end_of_row) {
232  // not enough columns yet and it is a string column
233  // treat the line delimiter as part of the string
234  field[field_i++] = *iit;
235  } else {
236  field[field_i] = '\0';
237  field_i = 0;
238  TStringValue ts;
239  ts.str_val = std::string(field);
240  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
241  auto xform =
242  row.cols.size() < row_desc.size() ? xforms[row.cols.size()] : nullptr;
243  if (!ts.is_null && xform != nullptr) {
244  if (print_transformation) {
245  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
246  }
247  ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
248  if (ts.str_val.empty()) {
249  ts.is_null = true;
250  }
251  if (print_transformation) {
252  std::cout << ts.str_val << std::endl;
253  }
254  }
255  row.cols.push_back(ts); // add column value to row
256  if (end_of_row || (row.cols.size() > row_desc.size())) {
257  break; // found row
258  }
259  }
260  } else {
261  if (*iit == '\\') {
262  backEscape = true;
263  } else if (backEscape || !remove_quotes || *iit != '\"') {
264  field[field_i++] = *iit;
265  backEscape = false;
266  }
267  // else if unescaped double-quote, continue without adding the
268  // charactger to the field string.
269  }
270  if (field_i >= MAX_FIELD_LEN) {
271  field[MAX_FIELD_LEN - 1] = '\0';
272  std::cerr << "String too long for buffer." << std::endl;
273  if (print_error_data) {
274  std::cerr << field << std::endl;
275  }
276  field_i = 0;
277  break;
278  }
279  ++iit;
280  }
281  if (row.cols.size() == row_desc.size()) {
282  input_rows.push_back(row);
283  if (input_rows.size() >= copy_params.batch_size) {
284  do_load(nrows, nskipped, input_rows, table_name, copy_params, conn_details);
285  input_rows.clear();
286  }
287  } else {
288  ++nskipped;
289  if (print_error_data) {
290  std::cerr << "Incorrect number of columns for row at: ";
291  bool not_first = false;
292  for (const auto& p : row.cols) {
293  if (not_first) {
294  std::cerr << copy_params.delimiter;
295  } else {
296  not_first = true;
297  }
298  std::cerr << &p;
299  }
300  std::cerr << std::endl;
301  }
302  if (row.cols.size() > row_desc.size()) {
303  // skip to the next line delimiter
304  while (*iit != copy_params.line_delim) {
305  ++iit;
306  }
307  }
308  }
309  ++iit;
310  }
311  // load remaining rowset if any
312  if (input_rows.size() > 0) {
313  do_load(nrows, nskipped, input_rows, table_name, copy_params, conn_details);
314  }
315 }
316 } // namespace
317 
318 int main(int argc, char** argv) {
319  std::string server_host("localhost"); // default to localhost
320  int port = 6274; // default port number
321  std::string table_name;
322  std::string db_name;
323  std::string user_name;
324  std::string passwd;
325  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
326  size_t batch_size = 10000;
327  size_t retry_count = 10;
328  size_t retry_wait = 5;
329  bool remove_quotes = false;
330  std::vector<std::string> xforms;
331  std::map<std::string,
332  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
333  transformations;
334 
335  namespace po = boost::program_options;
336 
337  po::options_description desc("Options");
338  desc.add_options()("help,h", "Print help messages ");
339  desc.add_options()(
340  "table", po::value<std::string>(&table_name)->required(), "Table Name");
341  desc.add_options()(
342  "database", po::value<std::string>(&db_name)->required(), "Database Name");
343  desc.add_options()(
344  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
345  desc.add_options()(
346  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
347  desc.add_options()("host",
348  po::value<std::string>(&server_host)->default_value(server_host),
349  "OmniSci Server Hostname");
350  desc.add_options()(
351  "port", po::value<int>(&port)->default_value(port), "OmniSci Server Port Number");
352  desc.add_options()("delim",
353  po::value<std::string>(&delim_str)->default_value(delim_str),
354  "Field delimiter");
355  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
356  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
357  desc.add_options()(
358  "quoted",
359  po::value<std::string>(&quoted),
360  "Whether the source contains quoted fields (true/false, default false)");
361  desc.add_options()("batch",
362  po::value<size_t>(&batch_size)->default_value(batch_size),
363  "Insert batch size");
364  desc.add_options()("retry_count",
365  po::value<size_t>(&retry_count)->default_value(retry_count),
366  "Number of time to retry an insert");
367  desc.add_options()("retry_wait",
368  po::value<size_t>(&retry_wait)->default_value(retry_wait),
369  "wait in secs between retries");
370  desc.add_options()("transform,t",
371  po::value<std::vector<std::string>>(&xforms)->multitoken(),
372  "Column Transformations");
373  desc.add_options()("print_error", "Print Error Rows");
374  desc.add_options()("print_transform", "Print Transformations");
375 
376  po::positional_options_description positionalOptions;
377  positionalOptions.add("table", 1);
378  positionalOptions.add("database", 1);
379 
380  po::variables_map vm;
381 
382  try {
383  po::store(po::command_line_parser(argc, argv)
384  .options(desc)
385  .positional(positionalOptions)
386  .run(),
387  vm);
388  if (vm.count("help")) {
389  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
390  "<password> [{--host} "
391  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
392  "string>][--line <line "
393  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
394  "[--quoted <true|false>] "
395  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
396  "secs>][--print_error][--print_transform]\n\n";
397  std::cout << desc << std::endl;
398  return 0;
399  }
400  if (vm.count("print_error")) {
401  print_error_data = true;
402  }
403  if (vm.count("print_transform")) {
404  print_transformation = true;
405  }
406 
407  po::notify(vm);
408  } catch (boost::program_options::error& e) {
409  std::cerr << "Usage Error: " << e.what() << std::endl;
410  return 1;
411  }
412 
413  char delim = delim_str[0];
414  if (delim == '\\') {
415  if (delim_str.size() < 2 ||
416  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
417  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
418  return 1;
419  }
420  if (delim_str[1] == 't') {
421  delim = '\t';
422  } else if (delim_str[1] == 'n') {
423  delim = '\n';
424  } else {
425  std::string d(delim_str);
426  d[0] = '0';
427  delim = (char)std::stoi(d, nullptr, 16);
428  }
429  }
430  if (isprint(delim)) {
431  std::cout << "Field Delimiter: " << delim << std::endl;
432  } else if (delim == '\t') {
433  std::cout << "Field Delimiter: "
434  << "\\t" << std::endl;
435  } else if (delim == '\n') {
436  std::cout << "Field Delimiter: "
437  << "\\n"
438  << std::endl;
439  } else {
440  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
441  }
442  char line_delim = line_delim_str[0];
443  if (line_delim == '\\') {
444  if (line_delim_str.size() < 2 ||
445  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
446  line_delim_str[1] != 'n')) {
447  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
448  return 1;
449  }
450  if (line_delim_str[1] == 't') {
451  line_delim = '\t';
452  } else if (line_delim_str[1] == 'n') {
453  line_delim = '\n';
454  } else {
455  std::string d(line_delim_str);
456  d[0] = '0';
457  line_delim = (char)std::stoi(d, nullptr, 16);
458  }
459  }
460  if (isprint(line_delim)) {
461  std::cout << "Line Delimiter: " << line_delim << std::endl;
462  } else if (line_delim == '\t') {
463  std::cout << "Line Delimiter: "
464  << "\\t" << std::endl;
465  } else if (line_delim == '\n') {
466  std::cout << "Line Delimiter: "
467  << "\\n"
468  << std::endl;
469  } else {
470  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
471  }
472  std::cout << "Null String: " << nulls << std::endl;
473  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
474 
475  if (quoted == "true") {
476  remove_quotes = true;
477  }
478 
479  for (auto& t : xforms) {
480  auto n = t.find_first_of(':');
481  if (n == std::string::npos) {
482  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
483  << std::endl;
484  return 1;
485  }
486  std::string col_name = t.substr(0, n);
487  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
488  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
489  << std::endl;
490  return 1;
491  }
492  auto n1 = n + 3;
493  auto n2 = t.find_first_of('/', n1);
494  if (n2 == std::string::npos) {
495  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
496  << std::endl;
497  return 1;
498  }
499  std::string regex_str = t.substr(n1, n2 - n1);
500  n1 = n2 + 1;
501  n2 = t.find_first_of('/', n1);
502  if (n2 == std::string::npos) {
503  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
504  << std::endl;
505  return 1;
506  }
507  std::string fmt_str = t.substr(n1, n2 - n1);
508  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
509  << std::endl;
510  transformations[col_name] =
511  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
512  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
513  std::unique_ptr<std::string>(new std::string(fmt_str)));
514  }
515 
516  CopyParams copy_params(delim, nulls, line_delim, batch_size, retry_count, retry_wait);
517 
518  // for attaching debugger std::this_thread::sleep_for (std::chrono::seconds(20));
519  ConnectionDetails conn_details(server_host, port, db_name, user_name, passwd);
520  createConnection(conn_details);
521 
522  TTableDetails table_details;
523  client->get_table_details(table_details, session, table_name);
524  stream_insert(table_name,
525  table_details.row_desc,
526  transformations,
527  copy_params,
528  conn_details,
529  remove_quotes);
530 
531  closeConnection();
532 
533  return 0;
534 }
void do_load(int &nrows, int &nskipped, std::vector< TStringRow > input_rows, const std::string &table_name, CopyParams copy_params, ConnectionDetails conn_details)
bool print_transformation
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
size_t retry_count
size_t batch_size
bool print_error_data
std::string user_name
Definition: DataGen.cpp:60
std::string null_str
#define MAX_FIELD_LEN
std::string db_name
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::string server_host
ConnectionDetails(std::string in_server_host, int in_port, std::string in_db_name, std::string in_user_name, std::string in_passwd)
mapd::shared_ptr< MapDClient > client
size_t retry_wait
CopyParams(char d, const std::string &n, char l, size_t b, size_t retries, size_t wait)
std::string passwd
mapd::shared_ptr< apache::thrift::transport::TTransport > mytransport
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const Importer_NS::CopyParams &copy_params, const bool remove_quotes)
int main(int argc, char **argv)
static bool run
void wait_disconnet_reconnnect_retry(size_t tries, CopyParams copy_params, ConnectionDetails conn_details)
void createConnection(ConnectionDetails con)
TSessionId session