OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
KafkaImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include <boost/algorithm/string.hpp>
25 #include <boost/algorithm/string/trim.hpp>
26 #include <cstring>
27 #include <iostream>
28 #include <iterator>
29 #include <string>
30 
31 #include "Logger/Logger.h"
32 #include "RowToColumnLoader.h"
33 #include "Shared/ThriftClient.h"
35 #include "Shared/sqltypes.h"
36 
37 #include <chrono>
38 #include <thread>
39 
40 #include <boost/program_options.hpp>
41 
42 #include <librdkafka/rdkafkacpp.h>
43 
44 #define MAX_FIELD_LEN 20000
45 
46 bool print_error_data = false;
47 bool print_transformation = false;
48 
49 static bool run = true;
50 static bool exit_eof = false;
51 static int eof_cnt = 0;
52 static int partition_cnt = 0;
53 static long msg_cnt = 0;
54 static int64_t msg_bytes = 0;
55 
56 class RebalanceCb : public RdKafka::RebalanceCb {
57  private:
58  static void part_list_print(const std::vector<RdKafka::TopicPartition*>& partitions) {
59  for (unsigned int i = 0; i < partitions.size(); i++) {
60  LOG(INFO) << "\t" << partitions[i]->topic() << "[" << partitions[i]->partition()
61  << "]";
62  }
63  }
64 
65  public:
66  void rebalance_cb(RdKafka::KafkaConsumer* consumer,
67  RdKafka::ErrorCode err,
68  std::vector<RdKafka::TopicPartition*>& partitions) override {
69  LOG(INFO) << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
70 
71  part_list_print(partitions);
72 
73  if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
74  consumer->assign(partitions);
75  partition_cnt = (int)partitions.size();
76  } else {
77  consumer->unassign();
78  partition_cnt = 0;
79  }
80  eof_cnt = 0;
81  }
82 };
83 
84 bool msg_consume(RdKafka::Message* message,
85  RowToColumnLoader& row_loader,
86  import_export::CopyParams copy_params,
87  const std::map<std::string,
88  std::pair<std::unique_ptr<boost::regex>,
89  std::unique_ptr<std::string>>>& transformations,
90  const bool remove_quotes) {
91  switch (message->err()) {
92  case RdKafka::ERR__TIMED_OUT:
93  VLOG(1) << " Timed out";
94  break;
95 
96  case RdKafka::ERR_NO_ERROR: { /* Real message */
97  msg_cnt++;
98  msg_bytes += message->len();
99  VLOG(1) << "Read msg at offset " << message->offset();
100  RdKafka::MessageTimestamp ts;
101  ts = message->timestamp();
102  if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
103  std::string tsname = "?";
104  if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
105  tsname = "create time";
106  } else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
107  tsname = "log append time";
108  }
109  VLOG(1) << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
110  }
111 
112  std::vector<char> buffer(message->len() + 2);
113  sprintf(buffer.data(),
114  "%.*s\n",
115  static_cast<int>(message->len()),
116  static_cast<const char*>(message->payload()));
117  VLOG(1) << "Full Message received is :'" << buffer.data() << "'";
118 
119  char field[MAX_FIELD_LEN];
120  size_t field_i = 0;
121 
122  bool backEscape = false;
123 
124  auto row_desc = row_loader.get_row_descriptor();
125 
126  std::vector<
127  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
128  xforms(row_desc.size());
129  for (size_t i = 0; i < row_desc.size(); i++) {
130  auto it = transformations.find(row_desc[i].col_name);
131  if (it != transformations.end()) {
132  xforms[i] = &(it->second);
133  } else {
134  xforms[i] = nullptr;
135  }
136  }
137 
138  std::vector<TStringValue>
139  row; // used to store each row as we move through the stream
140 
141  for (auto iit : buffer) {
142  if (iit == copy_params.delimiter || iit == copy_params.line_delim) {
143  bool end_of_field = (iit == copy_params.delimiter);
144  bool end_of_row;
145  if (end_of_field) {
146  end_of_row = false;
147  } else {
148  end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
149  (row.size() == row_desc.size() - 1);
150  if (!end_of_row) {
151  size_t l = copy_params.null_str.size();
152  if (field_i >= l &&
153  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
154  end_of_row = true;
155  }
156  }
157  }
158  if (!end_of_field && !end_of_row) {
159  // not enough columns yet and it is a string column
160  // treat the line delimiter as part of the string
161  field[field_i++] = iit;
162  } else {
163  field[field_i] = '\0';
164  field_i = 0;
165  TStringValue ts;
166  ts.str_val = std::string(field);
167  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
168  auto xform = row.size() < row_desc.size() ? xforms[row.size()] : nullptr;
169  if (!ts.is_null && xform != nullptr) {
170  if (print_transformation) {
171  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
172  }
173  ts.str_val =
174  boost::regex_replace(ts.str_val, *xform->first, *xform->second);
175  if (ts.str_val.empty()) {
176  ts.is_null = true;
177  }
178  if (print_transformation) {
179  std::cout << ts.str_val << std::endl;
180  }
181  }
182 
183  row.push_back(ts); // add column value to row
184  if (end_of_row || (row.size() > row_desc.size())) {
185  break; // found row
186  }
187  }
188  } else {
189  if (iit == '\\') {
190  backEscape = true;
191  } else if (backEscape || !remove_quotes || iit != '\"') {
192  field[field_i++] = iit;
193  backEscape = false;
194  }
195  // else if unescaped double-quote, continue without adding the
196  // character to the field string.
197  }
198  if (field_i >= MAX_FIELD_LEN) {
199  field[MAX_FIELD_LEN - 1] = '\0';
200  std::cerr << "String too long for buffer." << std::endl;
201  if (print_error_data) {
202  std::cerr << field << std::endl;
203  }
204  field_i = 0;
205  break;
206  }
207  }
208  if (row.size() == row_desc.size()) {
209  // add the new data in the column format
210  bool record_loaded = row_loader.convert_string_to_column(row, copy_params);
211  if (!record_loaded) {
212  // record could not be parsed correctly consider it skipped
213  return false;
214  } else {
215  return true;
216  }
217  } else {
218  if (print_error_data) {
219  std::cerr << "Incorrect number of columns for row: ";
220  std::cerr << row_loader.print_row_with_delim(row, copy_params) << std::endl;
221  return false;
222  }
223  }
224  }
225 
226  case RdKafka::ERR__PARTITION_EOF:
227  /* Last message */
228  if (exit_eof && ++eof_cnt == partition_cnt) {
229  LOG(ERROR) << "%% EOF reached for all " << partition_cnt << " partition(s)";
230  run = false;
231  }
232  break;
233 
234  case RdKafka::ERR__UNKNOWN_TOPIC:
235  case RdKafka::ERR__UNKNOWN_PARTITION:
236  LOG(ERROR) << "Consume failed: " << message->errstr() << std::endl;
237  run = false;
238  break;
239 
240  default:
241  /* Errors */
242  LOG(ERROR) << "Consume failed: " << message->errstr();
243  run = false;
244  }
245  return false;
246 };
247 
248 class ConsumeCb : public RdKafka::ConsumeCb {
249  public:
250  void consume_cb(RdKafka::Message& msg, void* opaque) override {
251  // reinterpret_cast<KafkaMgr*>(opaque)->
252  // msg_consume(&msg, opaque);
253  }
254 };
255 
256 class EventCb : public RdKafka::EventCb {
257  public:
258  void event_cb(RdKafka::Event& event) override {
259  switch (event.type()) {
260  case RdKafka::Event::EVENT_ERROR:
261  LOG(ERROR) << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str();
262  if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
263  LOG(ERROR) << "All brokers are down, we may need special handling here";
264  run = false;
265  }
266  break;
267 
268  case RdKafka::Event::EVENT_STATS:
269  VLOG(2) << "\"STATS\": " << event.str();
270  break;
271 
272  case RdKafka::Event::EVENT_LOG:
273  LOG(INFO) << "LOG-" << event.severity() << "-" << event.fac().c_str() << ":"
274  << event.str().c_str();
275  break;
276 
277  case RdKafka::Event::EVENT_THROTTLE:
278  LOG(INFO) << "THROTTLED: " << event.throttle_time() << "ms by "
279  << event.broker_name() << " id " << (int)event.broker_id();
280  break;
281 
282  default:
283  LOG(INFO) << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err())
284  << "): " << event.str();
285  break;
286  }
287  }
288 };
289 
290 // reads from a kafka topic (expects delimited string input)
292  RowToColumnLoader& row_loader,
293  const std::map<std::string,
294  std::pair<std::unique_ptr<boost::regex>,
295  std::unique_ptr<std::string>>>& transformations,
296  const import_export::CopyParams& copy_params,
297  const bool remove_quotes,
298  std::string group_id,
299  std::string topic,
300  std::string brokers) {
301  std::string errstr;
302  std::string topic_str;
303  std::string mode;
304  std::string debug;
305  std::vector<std::string> topics;
306  bool do_conf_dump = false;
307  int use_ccb = 0;
308 
309  RebalanceCb ex_rebalance_cb;
310 
311  /*
312  * Create configuration objects
313  */
314  RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
315  RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
316 
317  conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
318 
319  if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
320  LOG(FATAL) << "could not set group.id " << errstr;
321  }
322 
323  if (conf->set("compression.codec", "none", errstr) != /* can also be gzip or snappy */
324  RdKafka::Conf::CONF_OK) {
325  LOG(FATAL) << errstr;
326  }
327 
328  if (conf->set("statistics.interval.ms", "1000", errstr) != RdKafka::Conf::CONF_OK) {
329  LOG(FATAL) << errstr;
330  }
331  if (conf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
332  LOG(FATAL) << errstr;
333  }
334 
335  if (tconf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
336  LOG(FATAL) << errstr;
337  }
338 
339  if (tconf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
340  LOG(FATAL) << errstr;
341  }
342 
343  do_conf_dump = true;
344 
345  topics.push_back(topic);
346 
347  LOG(INFO) << "Version " << RdKafka::version_str().c_str();
348  LOG(INFO) << RdKafka::version();
349  LOG(INFO) << RdKafka::get_debug_contexts().c_str();
350 
351  conf->set("metadata.broker.list", brokers, errstr);
352 
353  // debug = "none";
354  if (!debug.empty()) {
355  if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
356  LOG(FATAL) << errstr;
357  }
358  }
359 
360  ConsumeCb consume_cb;
361  use_ccb = 0;
362  if (use_ccb) {
363  if (conf->set("consume_cb", &consume_cb, errstr) != RdKafka::Conf::CONF_OK) {
364  LOG(FATAL) << errstr;
365  }
366  // need to set the opaque pointer here for the callbacks
367  // rd_kafka_conf_set_opaque(conf, this);
368  }
369 
370  EventCb ex_event_cb;
371  if (conf->set("event_cb", &ex_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
372  LOG(FATAL) << errstr;
373  }
374 
375  if (conf->set("default_topic_conf", tconf, errstr) != RdKafka::Conf::CONF_OK) {
376  LOG(FATAL) << errstr;
377  }
378 
379  if (do_conf_dump) {
380  int pass;
381 
382  for (pass = 0; pass < 2; pass++) {
383  std::list<std::string>* dump;
384  if (pass == 0) {
385  dump = conf->dump();
386  LOG(INFO) << "# Global config";
387  LOG(INFO) << "===============";
388  } else {
389  dump = tconf->dump();
390  LOG(INFO) << "# Topic config";
391  LOG(INFO) << "===============";
392  }
393 
394  for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
395  std::string ts = *it;
396  it++;
397  LOG(INFO) << ts << " = " << *it;
398  it++;
399  }
400  LOG(INFO) << "Dump config finished";
401  }
402  }
403  LOG(INFO) << "FULL Dump config finished";
404 
405  delete tconf;
406 
407  /*
408  * Create consumer using accumulated global configuration.
409  */
410  RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);
411  if (!consumer) {
412  LOG(ERROR) << "Failed to create consumer: " << errstr;
413  }
414 
415  delete conf;
416 
417  LOG(INFO) << " Created consumer " << consumer->name();
418 
419  /*
420  * Subscribe to topics
421  */
422  RdKafka::ErrorCode err = consumer->subscribe(topics);
423  if (err) {
424  LOG(FATAL) << "Failed to subscribe to " << topics.size()
425  << " topics: " << RdKafka::err2str(err);
426  }
427 
428  /*
429  * Consume messages
430  */
431  size_t recv_rows = 0;
432  int skipped = 0;
433  int rows_loaded = 0;
434  while (run) {
435  RdKafka::Message* msg = consumer->consume(10000);
436  if (msg->err() == RdKafka::ERR_NO_ERROR) {
437  if (!use_ccb) {
438  bool added =
439  msg_consume(msg, row_loader, copy_params, transformations, remove_quotes);
440  if (added) {
441  recv_rows++;
442  if (recv_rows == copy_params.batch_size) {
443  recv_rows = 0;
444  row_loader.do_load(rows_loaded, skipped, copy_params);
445  // make sure we now commit that we are up to here to cover the mesages we just
446  // loaded
447  consumer->commitSync();
448  }
449  } else {
450  // LOG(ERROR) << " messsage was skipped ";
451  skipped++;
452  }
453  }
454  }
455  delete msg;
456  }
457 
458  /*
459  * Stop consumer
460  */
461  consumer->close();
462  delete consumer;
463 
464  LOG(INFO) << "Consumed " << msg_cnt << " messages (" << msg_bytes << " bytes)";
465  LOG(FATAL) << "Consumer shut down, probably due to an error please review logs";
466 };
467 
468 struct stuff {
471 
473  : row_loader(rl), copy_params(cp){};
474 };
475 
476 int main(int argc, char** argv) {
477  std::string server_host("localhost"); // default to localhost
478  int port = 6274; // default port number
479  bool http = false;
480  bool https = false;
481  bool skip_host_verify = false;
482  std::string ca_cert_name{""};
483  std::string table_name;
484  std::string db_name;
485  std::string user_name;
486  std::string passwd;
487  std::string group_id;
488  std::string topic;
489  std::string brokers;
490  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
491  size_t batch_size = 10000;
492  size_t retry_count = 10;
493  size_t retry_wait = 5;
494  bool remove_quotes = false;
495  std::vector<std::string> xforms;
496  std::map<std::string,
497  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
498  transformations;
499  ThriftConnectionType conn_type;
500 
501  namespace po = boost::program_options;
502 
503  po::options_description desc("Options");
504  desc.add_options()("help,h", "Print help messages ");
505  desc.add_options()(
506  "table", po::value<std::string>(&table_name)->required(), "Table Name");
507  desc.add_options()(
508  "database", po::value<std::string>(&db_name)->required(), "Database Name");
509  desc.add_options()(
510  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
511  desc.add_options()(
512  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
513  desc.add_options()("host",
514  po::value<std::string>(&server_host)->default_value(server_host),
515  "HeavyDB Server Hostname");
516  desc.add_options()(
517  "port", po::value<int>(&port)->default_value(port), "HeavyDB Server Port Number");
518  desc.add_options()("http",
519  po::bool_switch(&http)->default_value(http)->implicit_value(true),
520  "Use HTTP transport");
521  desc.add_options()("https",
522  po::bool_switch(&https)->default_value(https)->implicit_value(true),
523  "Use HTTPS transport");
524  desc.add_options()("skip-verify",
525  po::bool_switch(&skip_host_verify)
526  ->default_value(skip_host_verify)
527  ->implicit_value(true),
528  "Don't verify SSL certificate validity");
529  desc.add_options()(
530  "ca-cert",
531  po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
532  "Path to trusted server certificate. Initiates an encrypted connection");
533  desc.add_options()("delim",
534  po::value<std::string>(&delim_str)->default_value(delim_str),
535  "Field delimiter");
536  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
537  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
538  desc.add_options()(
539  "quoted",
540  po::value<std::string>(&quoted),
541  "Whether the source contains quoted fields (true/false, default false)");
542  desc.add_options()("batch",
543  po::value<size_t>(&batch_size)->default_value(batch_size),
544  "Insert batch size");
545  desc.add_options()("retry_count",
546  po::value<size_t>(&retry_count)->default_value(retry_count),
547  "Number of time to retry an insert");
548  desc.add_options()("retry_wait",
549  po::value<size_t>(&retry_wait)->default_value(retry_wait),
550  "wait in secs between retries");
551  desc.add_options()("transform,t",
552  po::value<std::vector<std::string>>(&xforms)->multitoken(),
553  "Column Transformations");
554  desc.add_options()("print_error", "Print Error Rows");
555  desc.add_options()("print_transform", "Print Transformations");
556  desc.add_options()("topic",
557  po::value<std::string>(&topic)->required(),
558  "Kafka topic to consume from ");
559  desc.add_options()("group-id",
560  po::value<std::string>(&group_id)->required(),
561  "Group id this consumer is part of");
562  desc.add_options()("brokers",
563  po::value<std::string>(&brokers)->required(),
564  "list of kafka brokers for topic");
565 
566  po::positional_options_description positionalOptions;
567  positionalOptions.add("table", 1);
568  positionalOptions.add("database", 1);
569 
570  logger::LogOptions log_options(argv[0]);
571  log_options.max_files_ = 0; // stderr only
572  desc.add(log_options.get_options());
573 
574  po::variables_map vm;
575 
576  try {
577  po::store(po::command_line_parser(argc, argv)
578  .options(desc)
579  .positional(positionalOptions)
580  .run(),
581  vm);
582  if (vm.count("help")) {
583  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
584  "<password> [{--host} "
585  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
586  "string>][--line <line "
587  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
588  "[--quoted <true|false>] "
589  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
590  "secs>][--print_error][--print_transform]\n\n";
591  std::cout << desc << std::endl;
592  return 0;
593  }
594  if (vm.count("print_error")) {
595  print_error_data = true;
596  }
597  if (vm.count("print_transform")) {
598  print_transformation = true;
599  }
600 
601  po::notify(vm);
602  } catch (boost::program_options::error& e) {
603  std::cerr << "Usage Error: " << e.what() << std::endl;
604  return 1;
605  }
606 
607  logger::init(log_options);
608 
609  if (http) {
610  conn_type = ThriftConnectionType::HTTP;
611  } else if (https) {
612  conn_type = ThriftConnectionType::HTTPS;
613  } else if (!ca_cert_name.empty()) {
615  } else {
616  conn_type = ThriftConnectionType::BINARY;
617  }
618 
619  char delim = delim_str[0];
620  if (delim == '\\') {
621  if (delim_str.size() < 2 ||
622  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
623  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
624  return 1;
625  }
626  if (delim_str[1] == 't') {
627  delim = '\t';
628  } else if (delim_str[1] == 'n') {
629  delim = '\n';
630  } else {
631  std::string d(delim_str);
632  d[0] = '0';
633  delim = (char)std::stoi(d, nullptr, 16);
634  }
635  }
636  if (isprint(delim)) {
637  std::cout << "Field Delimiter: " << delim << std::endl;
638  } else if (delim == '\t') {
639  std::cout << "Field Delimiter: "
640  << "\\t" << std::endl;
641  } else if (delim == '\n') {
642  std::cout << "Field Delimiter: "
643  << "\\n"
644  << std::endl;
645  } else {
646  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
647  }
648  char line_delim = line_delim_str[0];
649  if (line_delim == '\\') {
650  if (line_delim_str.size() < 2 ||
651  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
652  line_delim_str[1] != 'n')) {
653  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
654  return 1;
655  }
656  if (line_delim_str[1] == 't') {
657  line_delim = '\t';
658  } else if (line_delim_str[1] == 'n') {
659  line_delim = '\n';
660  } else {
661  std::string d(line_delim_str);
662  d[0] = '0';
663  line_delim = (char)std::stoi(d, nullptr, 16);
664  }
665  }
666  if (isprint(line_delim)) {
667  std::cout << "Line Delimiter: " << line_delim << std::endl;
668  } else if (line_delim == '\t') {
669  std::cout << "Line Delimiter: "
670  << "\\t" << std::endl;
671  } else if (line_delim == '\n') {
672  std::cout << "Line Delimiter: "
673  << "\\n"
674  << std::endl;
675  } else {
676  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
677  }
678  std::cout << "Null String: " << nulls << std::endl;
679  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
680 
681  if (quoted == "true") {
682  remove_quotes = true;
683  }
684 
685  for (auto& t : xforms) {
686  auto n = t.find_first_of(':');
687  if (n == std::string::npos) {
688  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
689  << std::endl;
690  return 1;
691  }
692  std::string col_name = t.substr(0, n);
693  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
694  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
695  << std::endl;
696  return 1;
697  }
698  auto n1 = n + 3;
699  auto n2 = t.find_first_of('/', n1);
700  if (n2 == std::string::npos) {
701  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
702  << std::endl;
703  return 1;
704  }
705  std::string regex_str = t.substr(n1, n2 - n1);
706  n1 = n2 + 1;
707  n2 = t.find_first_of('/', n1);
708  if (n2 == std::string::npos) {
709  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
710  << std::endl;
711  return 1;
712  }
713  std::string fmt_str = t.substr(n1, n2 - n1);
714  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
715  << std::endl;
716  transformations[col_name] =
717  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
718  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
719  std::unique_ptr<std::string>(new std::string(fmt_str)));
720  }
721 
722  import_export::CopyParams copy_params(
723  delim, nulls, line_delim, batch_size, retry_count, retry_wait);
724  RowToColumnLoader row_loader(
726  server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
727  user_name,
728  passwd,
729  db_name,
730  table_name);
731 
732  kafka_insert(
733  row_loader, transformations, copy_params, remove_quotes, group_id, topic, brokers);
734  return 0;
735 }
static int eof_cnt
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
ThriftConnectionType
Definition: ThriftClient.h:29
void consume_cb(RdKafka::Message &msg, void *opaque) override
#define LOG(tag)
Definition: Logger.h:285
Constants for Builtin SQL Types supported by HEAVY.AI.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
static long msg_cnt
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:55
static int partition_cnt
#define MAX_FIELD_LEN
bool print_transformation
stuff(RowToColumnLoader &rl, import_export::CopyParams &cp)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
bool msg_consume(RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
bool print_error_data
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
string version
Definition: setup.in.py:73
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void kafka_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams &copy_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
RowToColumnLoader row_loader
void event_cb(RdKafka::Event &event) override
static bool exit_eof
boost::program_options::options_description const & get_options() const
static void part_list_print(const std::vector< RdKafka::TopicPartition * > &partitions)
import_export::CopyParams copy_params
size_t max_files_
Definition: Logger.h:218
static bool run
static int64_t msg_bytes
constexpr double n
Definition: Utm.h:38
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< RdKafka::TopicPartition * > &partitions) override
#define VLOG(n)
Definition: Logger.h:388