OmniSciDB  343343d194
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StreamInsert.cpp File Reference

Sample MapD Client code for inserting a stream of rows with optional transformations from stdin to a MapD table. More...

#include <boost/regex.hpp>
#include <cstring>
#include <iostream>
#include <iterator>
#include <string>
#include <chrono>
#include <thread>
#include <boost/program_options.hpp>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include "gen-cpp/MapD.h"
#include <boost/make_shared.hpp>
+ Include dependency graph for StreamInsert.cpp:

Go to the source code of this file.

Classes

struct  CopyParams
 
struct  ConnectionDetails
 

Namespaces

 mapd
 
 anonymous_namespace{StreamInsert.cpp}
 

Macros

#define MAX_FIELD_LEN   20000
 

Functions

void anonymous_namespace{StreamInsert.cpp}::createConnection (ConnectionDetails con)
 
void anonymous_namespace{StreamInsert.cpp}::closeConnection ()
 
void anonymous_namespace{StreamInsert.cpp}::wait_disconnet_reconnnect_retry (size_t tries, CopyParams copy_params, ConnectionDetails conn_details)
 
void anonymous_namespace{StreamInsert.cpp}::do_load (int &nrows, int &nskipped, std::vector< TStringRow > input_rows, const std::string &table_name, CopyParams copy_params, ConnectionDetails conn_details)
 
void anonymous_namespace{StreamInsert.cpp}::stream_insert (const std::string &table_name, const TRowDescriptor &row_desc, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const CopyParams &copy_params, const ConnectionDetails conn_details, const bool remove_quotes)
 
int main (int argc, char **argv)
 

Variables

bool print_error_data = false
 
bool print_transformation = false
 
mapd::shared_ptr< MapDClient > client
 
TSessionId session
 
mapd::shared_ptr
< apache::thrift::transport::TTransport > 
mytransport
 

Detailed Description

Sample MapD Client code for inserting a stream of rows with optional transformations from stdin to a MapD table.

Sample MapD Client code for inserting a stream of rows from stdin to a MapD table.

Author
Wei Hong wei@m.nosp@m.apd..nosp@m.com Copyright (c) 2014 MapD Technologies, Inc. All rights reserved.

Definition in file StreamInsert.cpp.

Macro Definition Documentation

#define MAX_FIELD_LEN   20000

Function Documentation

int main ( int  argc,
char **  argv 
)

Definition at line 318 of file StreamInsert.cpp.

References client, anonymous_namespace{StreamInsert.cpp}::closeConnection(), anonymous_namespace{StreamInsert.cpp}::createConnection(), anonymous_namespace{ImportTest.cpp}::d(), print_error_data, print_transformation, run-benchmark-import::required, run, session, and stream_insert().

318  {
319  std::string server_host("localhost"); // default to localhost
320  int port = 6274; // default port number
321  std::string table_name;
322  std::string db_name;
323  std::string user_name;
324  std::string passwd;
325  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
326  size_t batch_size = 10000;
327  size_t retry_count = 10;
328  size_t retry_wait = 5;
329  bool remove_quotes = false;
330  std::vector<std::string> xforms;
331  std::map<std::string,
332  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
333  transformations;
334 
335  namespace po = boost::program_options;
336 
337  po::options_description desc("Options");
338  desc.add_options()("help,h", "Print help messages ");
339  desc.add_options()(
340  "table", po::value<std::string>(&table_name)->required(), "Table Name");
341  desc.add_options()(
342  "database", po::value<std::string>(&db_name)->required(), "Database Name");
343  desc.add_options()(
344  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
345  desc.add_options()(
346  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
347  desc.add_options()("host",
348  po::value<std::string>(&server_host)->default_value(server_host),
349  "OmniSci Server Hostname");
350  desc.add_options()(
351  "port", po::value<int>(&port)->default_value(port), "OmniSci Server Port Number");
352  desc.add_options()("delim",
353  po::value<std::string>(&delim_str)->default_value(delim_str),
354  "Field delimiter");
355  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
356  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
357  desc.add_options()(
358  "quoted",
359  po::value<std::string>(&quoted),
360  "Whether the source contains quoted fields (true/false, default false)");
361  desc.add_options()("batch",
362  po::value<size_t>(&batch_size)->default_value(batch_size),
363  "Insert batch size");
364  desc.add_options()("retry_count",
365  po::value<size_t>(&retry_count)->default_value(retry_count),
366  "Number of time to retry an insert");
367  desc.add_options()("retry_wait",
368  po::value<size_t>(&retry_wait)->default_value(retry_wait),
369  "wait in secs between retries");
370  desc.add_options()("transform,t",
371  po::value<std::vector<std::string>>(&xforms)->multitoken(),
372  "Column Transformations");
373  desc.add_options()("print_error", "Print Error Rows");
374  desc.add_options()("print_transform", "Print Transformations");
375 
376  po::positional_options_description positionalOptions;
377  positionalOptions.add("table", 1);
378  positionalOptions.add("database", 1);
379 
380  po::variables_map vm;
381 
382  try {
383  po::store(po::command_line_parser(argc, argv)
384  .options(desc)
385  .positional(positionalOptions)
386  .run(),
387  vm);
388  if (vm.count("help")) {
389  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
390  "<password> [{--host} "
391  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
392  "string>][--line <line "
393  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
394  "[--quoted <true|false>] "
395  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
396  "secs>][--print_error][--print_transform]\n\n";
397  std::cout << desc << std::endl;
398  return 0;
399  }
400  if (vm.count("print_error")) {
401  print_error_data = true;
402  }
403  if (vm.count("print_transform")) {
404  print_transformation = true;
405  }
406 
407  po::notify(vm);
408  } catch (boost::program_options::error& e) {
409  std::cerr << "Usage Error: " << e.what() << std::endl;
410  return 1;
411  }
412 
413  char delim = delim_str[0];
414  if (delim == '\\') {
415  if (delim_str.size() < 2 ||
416  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
417  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
418  return 1;
419  }
420  if (delim_str[1] == 't') {
421  delim = '\t';
422  } else if (delim_str[1] == 'n') {
423  delim = '\n';
424  } else {
425  std::string d(delim_str);
426  d[0] = '0';
427  delim = (char)std::stoi(d, nullptr, 16);
428  }
429  }
430  if (isprint(delim)) {
431  std::cout << "Field Delimiter: " << delim << std::endl;
432  } else if (delim == '\t') {
433  std::cout << "Field Delimiter: "
434  << "\\t" << std::endl;
435  } else if (delim == '\n') {
436  std::cout << "Field Delimiter: "
437  << "\\n"
438  << std::endl;
439  } else {
440  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
441  }
442  char line_delim = line_delim_str[0];
443  if (line_delim == '\\') {
444  if (line_delim_str.size() < 2 ||
445  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
446  line_delim_str[1] != 'n')) {
447  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
448  return 1;
449  }
450  if (line_delim_str[1] == 't') {
451  line_delim = '\t';
452  } else if (line_delim_str[1] == 'n') {
453  line_delim = '\n';
454  } else {
455  std::string d(line_delim_str);
456  d[0] = '0';
457  line_delim = (char)std::stoi(d, nullptr, 16);
458  }
459  }
460  if (isprint(line_delim)) {
461  std::cout << "Line Delimiter: " << line_delim << std::endl;
462  } else if (line_delim == '\t') {
463  std::cout << "Line Delimiter: "
464  << "\\t" << std::endl;
465  } else if (line_delim == '\n') {
466  std::cout << "Line Delimiter: "
467  << "\\n"
468  << std::endl;
469  } else {
470  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
471  }
472  std::cout << "Null String: " << nulls << std::endl;
473  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
474 
475  if (quoted == "true") {
476  remove_quotes = true;
477  }
478 
479  for (auto& t : xforms) {
480  auto n = t.find_first_of(':');
481  if (n == std::string::npos) {
482  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
483  << std::endl;
484  return 1;
485  }
486  std::string col_name = t.substr(0, n);
487  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
488  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
489  << std::endl;
490  return 1;
491  }
492  auto n1 = n + 3;
493  auto n2 = t.find_first_of('/', n1);
494  if (n2 == std::string::npos) {
495  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
496  << std::endl;
497  return 1;
498  }
499  std::string regex_str = t.substr(n1, n2 - n1);
500  n1 = n2 + 1;
501  n2 = t.find_first_of('/', n1);
502  if (n2 == std::string::npos) {
503  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
504  << std::endl;
505  return 1;
506  }
507  std::string fmt_str = t.substr(n1, n2 - n1);
508  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
509  << std::endl;
510  transformations[col_name] =
511  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
512  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
513  std::unique_ptr<std::string>(new std::string(fmt_str)));
514  }
515 
516  CopyParams copy_params(delim, nulls, line_delim, batch_size, retry_count, retry_wait);
517 
518  // for attaching debugger std::this_thread::sleep_for (std::chrono::seconds(20));
519  ConnectionDetails conn_details(server_host, port, db_name, user_name, passwd);
520  createConnection(conn_details);
521 
522  TTableDetails table_details;
523  client->get_table_details(table_details, session, table_name);
524  stream_insert(table_name,
525  table_details.row_desc,
526  transformations,
527  copy_params,
528  conn_details,
529  remove_quotes);
530 
531  closeConnection();
532 
533  return 0;
534 }
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:289
bool print_transformation
bool print_error_data
mapd::shared_ptr< MapDClient > client
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const Importer_NS::CopyParams &copy_params, const bool remove_quotes)
static bool run
void createConnection(ConnectionDetails con)
TSessionId session

+ Here is the call graph for this function:

Variable Documentation

mapd::shared_ptr<apache::thrift::transport::TTransport> mytransport
bool print_error_data = false

Definition at line 95 of file StreamInsert.cpp.

bool print_transformation = false

Definition at line 96 of file StreamInsert.cpp.

TSessionId session

Definition at line 99 of file StreamInsert.cpp.

Referenced by assert_grants(), check_grant_access(), anonymous_namespace{ExecuteTest.cpp}::choose_shard_count(), anonymous_namespace{StreamInsert.cpp}::closeConnection(), com.mapd.parser.server.test.TestMapDServer::closeMapDConnection(), com.mapd.logrunner.LogRunner::closeSession(), create_dashboards(), anonymous_namespace{StreamInsert.cpp}::createConnection(), com.mapd.parser.server.test.TestMapDServer::createMapDConnection(), anonymous_namespace{StreamInsert.cpp}::do_load(), com.mapd.testthrift.ThriftTester::doWork(), com.mapd.logrunner.LogRunner::doWork(), DashboardObject::drop_dashboards(), drop_dashboards(), Parser::InsertIntoTableAsSelectStmt::execute(), Parser::CreateTableAsSelectStmt::execute(), Parser::ExportQueryStmt::execute(), Parser::CreateViewStmt::execute(), com.mapd.parser.server.test.TestMapDServer::executeQuery(), anonymous_namespace{DBObjectPrivilegesTest.cpp}::get_qr_for_user(), com.mapd.tests.MapdTestClient::getClient(), com.omnisci.jdbc.OmniSciDatabaseMetaData::getColumns(), Calcite::getCompletionHints(), com.mapd.parser.server.CalciteServerHandler::getCompletionHints(), Parser::getResultSet(), com.omnisci.jdbc.OmniSciDatabaseMetaData::getSchemas(), com.mapd.logrunner.LogRunner::getSession(), com.omnisci.jdbc.OmniSciDatabaseMetaData::getTablePrivileges(), com.omnisci.jdbc.OmniSciDatabaseMetaData::getTables(), MapDExample.MapDExample::Main(), MapDExample::main(), main(), Lock_Namespace::parse_to_ra(), Parser::InsertIntoTableAsSelectStmt::populateData(), com.mapd.parser.server.CalciteServerHandler::process(), Parser::InsertIntoTableAsSelectStmt::LocalConnector::query(), DashboardObject::setup_dashboards(), com.omnisci.jdbc.OmniSciDatabaseMetaData::tablePermProcess(), TEST(), TEST_F(), testViewPermissions(), and com.mapd.logrunner.LogRunner::theRest().