OmniSciDB  04ee39c94c
KafkaImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include <boost/algorithm/string.hpp>
27 #include <boost/algorithm/string/trim.hpp>
28 #include <boost/regex.hpp>
29 #include <cstring>
30 #include <iostream>
31 #include <iterator>
32 #include <string>
33 
34 #include "RowToColumnLoader.h"
35 #include "Shared/Logger.h"
36 #include "Shared/ThriftClient.h"
37 #include "Shared/sqltypes.h"
38 
39 #include <chrono>
40 #include <thread>
41 
42 #include <boost/program_options.hpp>
43 
44 #include <librdkafka/rdkafkacpp.h>
45 
46 #define MAX_FIELD_LEN 20000
47 
48 bool print_error_data = false;
49 bool print_transformation = false;
50 
51 static bool run = true;
52 static bool exit_eof = false;
53 static int eof_cnt = 0;
54 static int partition_cnt = 0;
55 static long msg_cnt = 0;
56 static int64_t msg_bytes = 0;
57 
58 class RebalanceCb : public RdKafka::RebalanceCb {
59  private:
60  static void part_list_print(const std::vector<RdKafka::TopicPartition*>& partitions) {
61  for (unsigned int i = 0; i < partitions.size(); i++) {
62  LOG(INFO) << "\t" << partitions[i]->topic() << "[" << partitions[i]->partition()
63  << "]";
64  }
65  }
66 
67  public:
68  void rebalance_cb(RdKafka::KafkaConsumer* consumer,
69  RdKafka::ErrorCode err,
70  std::vector<RdKafka::TopicPartition*>& partitions) override {
71  LOG(INFO) << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
72 
73  part_list_print(partitions);
74 
75  if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
76  consumer->assign(partitions);
77  partition_cnt = (int)partitions.size();
78  } else {
79  consumer->unassign();
80  partition_cnt = 0;
81  }
82  eof_cnt = 0;
83  }
84 };
85 
86 bool msg_consume(RdKafka::Message* message,
87  RowToColumnLoader& row_loader,
88  Importer_NS::CopyParams copy_params,
89  const std::map<std::string,
90  std::pair<std::unique_ptr<boost::regex>,
91  std::unique_ptr<std::string>>>& transformations,
92  const bool remove_quotes) {
93  switch (message->err()) {
94  case RdKafka::ERR__TIMED_OUT:
95  VLOG(1) << " Timed out";
96  break;
97 
98  case RdKafka::ERR_NO_ERROR: { /* Real message */
99  msg_cnt++;
100  msg_bytes += message->len();
101  VLOG(1) << "Read msg at offset " << message->offset();
102  RdKafka::MessageTimestamp ts;
103  ts = message->timestamp();
104  if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
105  std::string tsname = "?";
106  if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
107  tsname = "create time";
108  } else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
109  tsname = "log append time";
110  }
111  VLOG(1) << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
112  }
113 
114  char buffer[message->len() + 1];
115  sprintf(buffer,
116  "%.*s\n",
117  static_cast<int>(message->len()),
118  static_cast<const char*>(message->payload()));
119  VLOG(1) << "Full Message received is :'" << buffer << "'";
120 
121  char field[MAX_FIELD_LEN];
122  size_t field_i = 0;
123 
124  bool backEscape = false;
125 
126  auto row_desc = row_loader.get_row_descriptor();
127 
128  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*
129  xforms[row_desc.size()];
130  for (size_t i = 0; i < row_desc.size(); i++) {
131  auto it = transformations.find(row_desc[i].col_name);
132  if (it != transformations.end()) {
133  xforms[i] = &(it->second);
134  } else {
135  xforms[i] = nullptr;
136  }
137  }
138 
139  std::vector<TStringValue>
140  row; // used to store each row as we move through the stream
141 
142  for (auto iit : buffer) {
143  if (iit == copy_params.delimiter || iit == copy_params.line_delim) {
144  bool end_of_field = (iit == copy_params.delimiter);
145  bool end_of_row;
146  if (end_of_field) {
147  end_of_row = false;
148  } else {
149  end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
150  (row.size() == row_desc.size() - 1);
151  if (!end_of_row) {
152  size_t l = copy_params.null_str.size();
153  if (field_i >= l &&
154  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
155  end_of_row = true;
156  }
157  }
158  }
159  if (!end_of_field && !end_of_row) {
160  // not enough columns yet and it is a string column
161  // treat the line delimiter as part of the string
162  field[field_i++] = iit;
163  } else {
164  field[field_i] = '\0';
165  field_i = 0;
166  TStringValue ts;
167  ts.str_val = std::string(field);
168  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
169  auto xform = row.size() < row_desc.size() ? xforms[row.size()] : nullptr;
170  if (!ts.is_null && xform != nullptr) {
171  if (print_transformation) {
172  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
173  }
174  ts.str_val =
175  boost::regex_replace(ts.str_val, *xform->first, *xform->second);
176  if (ts.str_val.empty()) {
177  ts.is_null = true;
178  }
179  if (print_transformation) {
180  std::cout << ts.str_val << std::endl;
181  }
182  }
183 
184  row.push_back(ts); // add column value to row
185  if (end_of_row || (row.size() > row_desc.size())) {
186  break; // found row
187  }
188  }
189  } else {
190  if (iit == '\\') {
191  backEscape = true;
192  } else if (backEscape || !remove_quotes || iit != '\"') {
193  field[field_i++] = iit;
194  backEscape = false;
195  }
196  // else if unescaped double-quote, continue without adding the
197  // character to the field string.
198  }
199  if (field_i >= MAX_FIELD_LEN) {
200  field[MAX_FIELD_LEN - 1] = '\0';
201  std::cerr << "String too long for buffer." << std::endl;
202  if (print_error_data) {
203  std::cerr << field << std::endl;
204  }
205  field_i = 0;
206  break;
207  }
208  }
209  if (row.size() == row_desc.size()) {
210  // add the new data in the column format
211  bool record_loaded = row_loader.convert_string_to_column(row, copy_params);
212  if (!record_loaded) {
213  // record could not be parsed correctly consider it skipped
214  return false;
215  } else {
216  return true;
217  }
218  } else {
219  if (print_error_data) {
220  std::cerr << "Incorrect number of columns for row: ";
221  std::cerr << row_loader.print_row_with_delim(row, copy_params) << std::endl;
222  return false;
223  }
224  }
225  }
226 
227  case RdKafka::ERR__PARTITION_EOF:
228  /* Last message */
229  if (exit_eof && ++eof_cnt == partition_cnt) {
230  LOG(ERROR) << "%% EOF reached for all " << partition_cnt << " partition(s)";
231  run = false;
232  }
233  break;
234 
235  case RdKafka::ERR__UNKNOWN_TOPIC:
236  case RdKafka::ERR__UNKNOWN_PARTITION:
237  LOG(ERROR) << "Consume failed: " << message->errstr() << std::endl;
238  run = false;
239  break;
240 
241  default:
242  /* Errors */
243  LOG(ERROR) << "Consume failed: " << message->errstr();
244  run = false;
245  }
246  return false;
247 };
248 
249 class ConsumeCb : public RdKafka::ConsumeCb {
250  public:
251  void consume_cb(RdKafka::Message& msg, void* opaque) override {
252  // reinterpret_cast<KafkaMgr*>(opaque)->
253  // msg_consume(&msg, opaque);
254  }
255 };
256 
257 class EventCb : public RdKafka::EventCb {
258  public:
259  void event_cb(RdKafka::Event& event) override {
260  switch (event.type()) {
261  case RdKafka::Event::EVENT_ERROR:
262  LOG(ERROR) << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str();
263  if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
264  LOG(ERROR) << "All brokers are down, we may need special handling here";
265  run = false;
266  }
267  break;
268 
269  case RdKafka::Event::EVENT_STATS:
270  VLOG(2) << "\"STATS\": " << event.str();
271  break;
272 
273  case RdKafka::Event::EVENT_LOG:
274  LOG(INFO) << "LOG-" << event.severity() << "-" << event.fac().c_str() << ":"
275  << event.str().c_str();
276  break;
277 
278  case RdKafka::Event::EVENT_THROTTLE:
279  LOG(INFO) << "THROTTLED: " << event.throttle_time() << "ms by "
280  << event.broker_name() << " id " << (int)event.broker_id();
281  break;
282 
283  default:
284  LOG(INFO) << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err())
285  << "): " << event.str();
286  break;
287  }
288  }
289 };
290 
291 // reads from a kafka topic (expects delimited string input)
293  RowToColumnLoader& row_loader,
294  const std::map<std::string,
295  std::pair<std::unique_ptr<boost::regex>,
296  std::unique_ptr<std::string>>>& transformations,
297  const Importer_NS::CopyParams& copy_params,
298  const bool remove_quotes,
299  std::string group_id,
300  std::string topic,
301  std::string brokers) {
302  std::string errstr;
303  std::string topic_str;
304  std::string mode;
305  std::string debug;
306  std::vector<std::string> topics;
307  bool do_conf_dump = false;
308  int use_ccb = 0;
309 
310  RebalanceCb ex_rebalance_cb;
311 
312  /*
313  * Create configuration objects
314  */
315  RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
316  RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
317 
318  conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
319 
320  if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
321  LOG(FATAL) << "could not set group.id " << errstr;
322  }
323 
324  if (conf->set("compression.codec", "none", errstr) != /* can also be gzip or snappy */
325  RdKafka::Conf::CONF_OK) {
326  LOG(FATAL) << errstr;
327  }
328 
329  if (conf->set("statistics.interval.ms", "1000", errstr) != RdKafka::Conf::CONF_OK) {
330  LOG(FATAL) << errstr;
331  }
332  if (conf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
333  LOG(FATAL) << errstr;
334  }
335 
336  if (tconf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
337  LOG(FATAL) << errstr;
338  }
339 
340  if (tconf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
341  LOG(FATAL) << errstr;
342  }
343 
344  do_conf_dump = true;
345 
346  topics.push_back(topic);
347 
348  LOG(INFO) << "Version " << RdKafka::version_str().c_str();
349  LOG(INFO) << RdKafka::version();
350  LOG(INFO) << RdKafka::get_debug_contexts().c_str();
351 
352  conf->set("metadata.broker.list", brokers, errstr);
353 
354  // debug = "none";
355  if (!debug.empty()) {
356  if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
357  LOG(FATAL) << errstr;
358  }
359  }
360 
361  ConsumeCb consume_cb;
362  use_ccb = 0;
363  if (use_ccb) {
364  if (conf->set("consume_cb", &consume_cb, errstr) != RdKafka::Conf::CONF_OK) {
365  LOG(FATAL) << errstr;
366  }
367  // need to set the opaque pointer here for the callbacks
368  // rd_kafka_conf_set_opaque(conf, this);
369  }
370 
371  EventCb ex_event_cb;
372  if (conf->set("event_cb", &ex_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
373  LOG(FATAL) << errstr;
374  }
375 
376  if (conf->set("default_topic_conf", tconf, errstr) != RdKafka::Conf::CONF_OK) {
377  LOG(FATAL) << errstr;
378  }
379 
380  if (do_conf_dump) {
381  int pass;
382 
383  for (pass = 0; pass < 2; pass++) {
384  std::list<std::string>* dump;
385  if (pass == 0) {
386  dump = conf->dump();
387  LOG(INFO) << "# Global config";
388  LOG(INFO) << "===============";
389  } else {
390  dump = tconf->dump();
391  LOG(INFO) << "# Topic config";
392  LOG(INFO) << "===============";
393  }
394 
395  for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
396  std::string ts = *it;
397  it++;
398  LOG(INFO) << ts << " = " << *it;
399  it++;
400  }
401  LOG(INFO) << "Dump config finished";
402  }
403  }
404  LOG(INFO) << "FULL Dump config finished";
405 
406  delete tconf;
407 
408  /*
409  * Create consumer using accumulated global configuration.
410  */
411  RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);
412  if (!consumer) {
413  LOG(ERROR) << "Failed to create consumer: " << errstr;
414  }
415 
416  delete conf;
417 
418  LOG(INFO) << " Created consumer " << consumer->name();
419 
420  /*
421  * Subscribe to topics
422  */
423  RdKafka::ErrorCode err = consumer->subscribe(topics);
424  if (err) {
425  LOG(FATAL) << "Failed to subscribe to " << topics.size()
426  << " topics: " << RdKafka::err2str(err);
427  }
428 
429  /*
430  * Consume messages
431  */
432  size_t recv_rows = 0;
433  int skipped = 0;
434  int rows_loaded = 0;
435  while (run) {
436  RdKafka::Message* msg = consumer->consume(10000);
437  if (msg->err() == RdKafka::ERR_NO_ERROR) {
438  if (!use_ccb) {
439  bool added =
440  msg_consume(msg, row_loader, copy_params, transformations, remove_quotes);
441  if (added) {
442  recv_rows++;
443  if (recv_rows == copy_params.batch_size) {
444  recv_rows = 0;
445  row_loader.do_load(rows_loaded, skipped, copy_params);
446  // make sure we now commit that we are up to here to cover the mesages we just
447  // loaded
448  consumer->commitSync();
449  }
450  } else {
451  // LOG(ERROR) << " messsage was skipped ";
452  skipped++;
453  }
454  }
455  }
456  delete msg;
457  }
458 
459  /*
460  * Stop consumer
461  */
462  consumer->close();
463  delete consumer;
464 
465  LOG(INFO) << "Consumed " << msg_cnt << " messages (" << msg_bytes << " bytes)";
466  LOG(FATAL) << "Consumer shut down, probably due to an error please review logs";
467 };
468 
469 struct stuff {
472 
474  : row_loader(rl), copy_params(cp){};
475 };
476 
477 int main(int argc, char** argv) {
478  std::string server_host("localhost"); // default to localhost
479  int port = 6274; // default port number
480  bool http = false;
481  bool https = false;
482  bool skip_host_verify = false;
483  std::string ca_cert_name{""};
484  std::string table_name;
485  std::string db_name;
486  std::string user_name;
487  std::string passwd;
488  std::string group_id;
489  std::string topic;
490  std::string brokers;
491  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
492  size_t batch_size = 10000;
493  size_t retry_count = 10;
494  size_t retry_wait = 5;
495  bool remove_quotes = false;
496  std::vector<std::string> xforms;
497  std::map<std::string,
498  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
499  transformations;
500  ThriftConnectionType conn_type;
501 
502  namespace po = boost::program_options;
503 
504  po::options_description desc("Options");
505  desc.add_options()("help,h", "Print help messages ");
506  desc.add_options()(
507  "table", po::value<std::string>(&table_name)->required(), "Table Name");
508  desc.add_options()(
509  "database", po::value<std::string>(&db_name)->required(), "Database Name");
510  desc.add_options()(
511  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
512  desc.add_options()(
513  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
514  desc.add_options()("host",
515  po::value<std::string>(&server_host)->default_value(server_host),
516  "OmniSci Server Hostname");
517  desc.add_options()(
518  "port", po::value<int>(&port)->default_value(port), "OmniSci Server Port Number");
519  desc.add_options()("http",
520  po::bool_switch(&http)->default_value(http)->implicit_value(true),
521  "Use HTTP transport");
522  desc.add_options()("https",
523  po::bool_switch(&https)->default_value(https)->implicit_value(true),
524  "Use HTTPS transport");
525  desc.add_options()("skip-verify",
526  po::bool_switch(&skip_host_verify)
527  ->default_value(skip_host_verify)
528  ->implicit_value(true),
529  "Don't verify SSL certificate validity");
530  desc.add_options()(
531  "ca-cert",
532  po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
533  "Path to trusted server certificate. Initiates an encrypted connection");
534  desc.add_options()("delim",
535  po::value<std::string>(&delim_str)->default_value(delim_str),
536  "Field delimiter");
537  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
538  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
539  desc.add_options()(
540  "quoted",
541  po::value<std::string>(&quoted),
542  "Whether the source contains quoted fields (true/false, default false)");
543  desc.add_options()("batch",
544  po::value<size_t>(&batch_size)->default_value(batch_size),
545  "Insert batch size");
546  desc.add_options()("retry_count",
547  po::value<size_t>(&retry_count)->default_value(retry_count),
548  "Number of time to retry an insert");
549  desc.add_options()("retry_wait",
550  po::value<size_t>(&retry_wait)->default_value(retry_wait),
551  "wait in secs between retries");
552  desc.add_options()("transform,t",
553  po::value<std::vector<std::string>>(&xforms)->multitoken(),
554  "Column Transformations");
555  desc.add_options()("print_error", "Print Error Rows");
556  desc.add_options()("print_transform", "Print Transformations");
557  desc.add_options()("topic",
558  po::value<std::string>(&topic)->required(),
559  "Kafka topic to consume from ");
560  desc.add_options()("group-id",
561  po::value<std::string>(&group_id)->required(),
562  "Group id this consumer is part of");
563  desc.add_options()("brokers",
564  po::value<std::string>(&brokers)->required(),
565  "list of kafka brokers for topic");
566 
567  po::positional_options_description positionalOptions;
568  positionalOptions.add("table", 1);
569  positionalOptions.add("database", 1);
570 
571  logger::LogOptions log_options(argv[0]);
572  log_options.max_files_ = 0; // stderr only
573  desc.add(log_options.get_options());
574 
575  po::variables_map vm;
576 
577  try {
578  po::store(po::command_line_parser(argc, argv)
579  .options(desc)
580  .positional(positionalOptions)
581  .run(),
582  vm);
583  if (vm.count("help")) {
584  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
585  "<password> [{--host} "
586  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
587  "string>][--line <line "
588  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
589  "[--quoted <true|false>] "
590  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
591  "secs>][--print_error][--print_transform]\n\n";
592  std::cout << desc << std::endl;
593  return 0;
594  }
595  if (vm.count("print_error")) {
596  print_error_data = true;
597  }
598  if (vm.count("print_transform")) {
599  print_transformation = true;
600  }
601 
602  po::notify(vm);
603  } catch (boost::program_options::error& e) {
604  std::cerr << "Usage Error: " << e.what() << std::endl;
605  return 1;
606  }
607 
608  logger::init(log_options);
609 
610  if (http) {
611  conn_type = ThriftConnectionType::HTTP;
612  } else if (https) {
613  conn_type = ThriftConnectionType::HTTPS;
614  } else if (!ca_cert_name.empty()) {
616  } else {
617  conn_type = ThriftConnectionType::BINARY;
618  }
619 
620  char delim = delim_str[0];
621  if (delim == '\\') {
622  if (delim_str.size() < 2 ||
623  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
624  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
625  return 1;
626  }
627  if (delim_str[1] == 't') {
628  delim = '\t';
629  } else if (delim_str[1] == 'n') {
630  delim = '\n';
631  } else {
632  std::string d(delim_str);
633  d[0] = '0';
634  delim = (char)std::stoi(d, nullptr, 16);
635  }
636  }
637  if (isprint(delim)) {
638  std::cout << "Field Delimiter: " << delim << std::endl;
639  } else if (delim == '\t') {
640  std::cout << "Field Delimiter: "
641  << "\\t" << std::endl;
642  } else if (delim == '\n') {
643  std::cout << "Field Delimiter: "
644  << "\\n"
645  << std::endl;
646  } else {
647  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
648  }
649  char line_delim = line_delim_str[0];
650  if (line_delim == '\\') {
651  if (line_delim_str.size() < 2 ||
652  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
653  line_delim_str[1] != 'n')) {
654  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
655  return 1;
656  }
657  if (line_delim_str[1] == 't') {
658  line_delim = '\t';
659  } else if (line_delim_str[1] == 'n') {
660  line_delim = '\n';
661  } else {
662  std::string d(line_delim_str);
663  d[0] = '0';
664  line_delim = (char)std::stoi(d, nullptr, 16);
665  }
666  }
667  if (isprint(line_delim)) {
668  std::cout << "Line Delimiter: " << line_delim << std::endl;
669  } else if (line_delim == '\t') {
670  std::cout << "Line Delimiter: "
671  << "\\t" << std::endl;
672  } else if (line_delim == '\n') {
673  std::cout << "Line Delimiter: "
674  << "\\n"
675  << std::endl;
676  } else {
677  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
678  }
679  std::cout << "Null String: " << nulls << std::endl;
680  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
681 
682  if (quoted == "true") {
683  remove_quotes = true;
684  }
685 
686  for (auto& t : xforms) {
687  auto n = t.find_first_of(':');
688  if (n == std::string::npos) {
689  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
690  << std::endl;
691  return 1;
692  }
693  std::string col_name = t.substr(0, n);
694  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
695  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
696  << std::endl;
697  return 1;
698  }
699  auto n1 = n + 3;
700  auto n2 = t.find_first_of('/', n1);
701  if (n2 == std::string::npos) {
702  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
703  << std::endl;
704  return 1;
705  }
706  std::string regex_str = t.substr(n1, n2 - n1);
707  n1 = n2 + 1;
708  n2 = t.find_first_of('/', n1);
709  if (n2 == std::string::npos) {
710  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
711  << std::endl;
712  return 1;
713  }
714  std::string fmt_str = t.substr(n1, n2 - n1);
715  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
716  << std::endl;
717  transformations[col_name] =
718  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
719  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
720  std::unique_ptr<std::string>(new std::string(fmt_str)));
721  }
722 
723  Importer_NS::CopyParams copy_params(
724  delim, nulls, line_delim, batch_size, retry_count, retry_wait);
725  RowToColumnLoader row_loader(
727  server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
728  user_name,
729  passwd,
730  db_name,
731  table_name);
732 
733  kafka_insert(
734  row_loader, transformations, copy_params, remove_quotes, group_id, topic, brokers);
735  return 0;
736 }
static void part_list_print(const std::vector< RdKafka::TopicPartition *> &partitions)
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
static int eof_cnt
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< RdKafka::TopicPartition *> &partitions) override
std::string null_str
Definition: Importer.h:100
TRowDescriptor get_row_descriptor()
ThriftConnectionType
Definition: ThriftClient.h:32
void do_load(int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
void consume_cb(RdKafka::Message &msg, void *opaque) override
int main(int argc, char **argv)
#define LOG(tag)
Definition: Logger.h:182
bool convert_string_to_column(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
boost::program_options::options_description const & get_options() const
Definition: Logger.cpp:112
Constants for Builtin SQL Types supported by MapD.
static long msg_cnt
static int partition_cnt
#define MAX_FIELD_LEN
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool print_error_data
void init(LogOptions const &log_opts)
Definition: Logger.cpp:260
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
Importer_NS::CopyParams copy_params
void kafka_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const Importer_NS::CopyParams &copy_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
RowToColumnLoader row_loader
void event_cb(RdKafka::Event &event) override
bool msg_consume(RdKafka::Message *message, RowToColumnLoader &row_loader, Importer_NS::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
static bool exit_eof
stuff(RowToColumnLoader &rl, Importer_NS::CopyParams &cp)
size_t max_files_
Definition: Logger.h:117
static bool run
static int64_t msg_bytes
static unsigned pass
Definition: testing.h:29
#define VLOG(n)
Definition: Logger.h:277