OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
KafkaImporter.cpp File Reference

Based on StreamInsert code but using binary columnar format for inserting a stream of rows with optional transformations from stdin to a DB table. More...

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <cstring>
#include <iostream>
#include <iterator>
#include <string>
#include "Logger/Logger.h"
#include "RowToColumnLoader.h"
#include "Shared/ThriftClient.h"
#include "Shared/clean_boost_regex.hpp"
#include "Shared/sqltypes.h"
#include <chrono>
#include <thread>
#include <boost/program_options.hpp>
#include <librdkafka/rdkafkacpp.h>
+ Include dependency graph for KafkaImporter.cpp:

Go to the source code of this file.

Classes

class  RebalanceCb
 
class  ConsumeCb
 
class  EventCb
 
struct  stuff
 

Macros

#define MAX_FIELD_LEN   20000
 

Functions

bool msg_consume (RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
 
void kafka_insert (RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams &copy_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
 
int main (int argc, char **argv)
 

Variables

bool print_error_data = false
 
bool print_transformation = false
 
static bool run = true
 
static bool exit_eof = false
 
static int eof_cnt = 0
 
static int partition_cnt = 0
 
static long msg_cnt = 0
 
static int64_t msg_bytes = 0
 

Detailed Description

Based on StreamInsert code but using binary columnar format for inserting a stream of rows with optional transformations from stdin to a DB table.

Definition in file KafkaImporter.cpp.

Macro Definition Documentation

#define MAX_FIELD_LEN   20000

Definition at line 44 of file KafkaImporter.cpp.

Referenced by msg_consume().

Function Documentation

void kafka_insert ( RowToColumnLoader row_loader,
const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &  transformations,
const import_export::CopyParams copy_params,
const bool  remove_quotes,
std::string  group_id,
std::string  topic,
std::string  brokers 
)

Definition at line 291 of file KafkaImporter.cpp.

References import_export::CopyParams::batch_size, File_Namespace::create(), RowToColumnLoader::do_load(), logger::ERROR, logger::FATAL, logger::INFO, LOG, msg_bytes, msg_cnt, msg_consume(), run_benchmark_import::rows_loaded, run, and setup::version.

Referenced by main().

300  {
301  std::string errstr;
302  std::string topic_str;
303  std::string mode;
304  std::string debug;
305  std::vector<std::string> topics;
306  bool do_conf_dump = false;
307  int use_ccb = 0;
308 
309  RebalanceCb ex_rebalance_cb;
310 
311  /*
312  * Create configuration objects
313  */
314  RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
315  RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
316 
317  conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
318 
319  if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
320  LOG(FATAL) << "could not set group.id " << errstr;
321  }
322 
323  if (conf->set("compression.codec", "none", errstr) != /* can also be gzip or snappy */
324  RdKafka::Conf::CONF_OK) {
325  LOG(FATAL) << errstr;
326  }
327 
328  if (conf->set("statistics.interval.ms", "1000", errstr) != RdKafka::Conf::CONF_OK) {
329  LOG(FATAL) << errstr;
330  }
331  if (conf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
332  LOG(FATAL) << errstr;
333  }
334 
335  if (tconf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
336  LOG(FATAL) << errstr;
337  }
338 
339  if (tconf->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
340  LOG(FATAL) << errstr;
341  }
342 
343  do_conf_dump = true;
344 
345  topics.push_back(topic);
346 
347  LOG(INFO) << "Version " << RdKafka::version_str().c_str();
348  LOG(INFO) << RdKafka::version();
349  LOG(INFO) << RdKafka::get_debug_contexts().c_str();
350 
351  conf->set("metadata.broker.list", brokers, errstr);
352 
353  // debug = "none";
354  if (!debug.empty()) {
355  if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
356  LOG(FATAL) << errstr;
357  }
358  }
359 
360  ConsumeCb consume_cb;
361  use_ccb = 0;
362  if (use_ccb) {
363  if (conf->set("consume_cb", &consume_cb, errstr) != RdKafka::Conf::CONF_OK) {
364  LOG(FATAL) << errstr;
365  }
366  // need to set the opaque pointer here for the callbacks
367  // rd_kafka_conf_set_opaque(conf, this);
368  }
369 
370  EventCb ex_event_cb;
371  if (conf->set("event_cb", &ex_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
372  LOG(FATAL) << errstr;
373  }
374 
375  if (conf->set("default_topic_conf", tconf, errstr) != RdKafka::Conf::CONF_OK) {
376  LOG(FATAL) << errstr;
377  }
378 
379  if (do_conf_dump) {
380  int pass;
381 
382  for (pass = 0; pass < 2; pass++) {
383  std::list<std::string>* dump;
384  if (pass == 0) {
385  dump = conf->dump();
386  LOG(INFO) << "# Global config";
387  LOG(INFO) << "===============";
388  } else {
389  dump = tconf->dump();
390  LOG(INFO) << "# Topic config";
391  LOG(INFO) << "===============";
392  }
393 
394  for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
395  std::string ts = *it;
396  it++;
397  LOG(INFO) << ts << " = " << *it;
398  it++;
399  }
400  LOG(INFO) << "Dump config finished";
401  }
402  }
403  LOG(INFO) << "FULL Dump config finished";
404 
405  delete tconf;
406 
407  /*
408  * Create consumer using accumulated global configuration.
409  */
410  RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);
411  if (!consumer) {
412  LOG(ERROR) << "Failed to create consumer: " << errstr;
413  }
414 
415  delete conf;
416 
417  LOG(INFO) << " Created consumer " << consumer->name();
418 
419  /*
420  * Subscribe to topics
421  */
422  RdKafka::ErrorCode err = consumer->subscribe(topics);
423  if (err) {
424  LOG(FATAL) << "Failed to subscribe to " << topics.size()
425  << " topics: " << RdKafka::err2str(err);
426  }
427 
428  /*
429  * Consume messages
430  */
431  size_t recv_rows = 0;
432  int skipped = 0;
433  int rows_loaded = 0;
434  while (run) {
435  RdKafka::Message* msg = consumer->consume(10000);
436  if (msg->err() == RdKafka::ERR_NO_ERROR) {
437  if (!use_ccb) {
438  bool added =
439  msg_consume(msg, row_loader, copy_params, transformations, remove_quotes);
440  if (added) {
441  recv_rows++;
442  if (recv_rows == copy_params.batch_size) {
443  recv_rows = 0;
444  row_loader.do_load(rows_loaded, skipped, copy_params);
445  // make sure we now commit that we are up to here to cover the mesages we just
446  // loaded
447  consumer->commitSync();
448  }
449  } else {
450  // LOG(ERROR) << " messsage was skipped ";
451  skipped++;
452  }
453  }
454  }
455  delete msg;
456  }
457 
458  /*
459  * Stop consumer
460  */
461  consumer->close();
462  delete consumer;
463 
464  LOG(INFO) << "Consumed " << msg_cnt << " messages (" << msg_bytes << " bytes)";
465  LOG(FATAL) << "Consumer shut down, probably due to an error please review logs";
466 };
#define LOG(tag)
Definition: Logger.h:285
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
static long msg_cnt
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:55
bool msg_consume(RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
string version
Definition: setup.in.py:73
static bool run
static int64_t msg_bytes

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int main ( int  argc,
char **  argv 
)

Definition at line 476 of file KafkaImporter.cpp.

References BINARY, BINARY_SSL, logger::LogOptions::get_options(), HTTP, HTTPS, logger::init(), kafka_insert(), logger::LogOptions::max_files_, anonymous_namespace{Utm.h}::n, print_error_data, print_transformation, foreign_storage::anonymous_namespace{LogFileBufferParser.cpp}::remove_quotes(), run_benchmark_import::required, and run.

476  {
477  std::string server_host("localhost"); // default to localhost
478  int port = 6274; // default port number
479  bool http = false;
480  bool https = false;
481  bool skip_host_verify = false;
482  std::string ca_cert_name{""};
483  std::string table_name;
484  std::string db_name;
485  std::string user_name;
486  std::string passwd;
487  std::string group_id;
488  std::string topic;
489  std::string brokers;
490  std::string delim_str(","), nulls("\\N"), line_delim_str("\n"), quoted("false");
491  size_t batch_size = 10000;
492  size_t retry_count = 10;
493  size_t retry_wait = 5;
494  bool remove_quotes = false;
495  std::vector<std::string> xforms;
496  std::map<std::string,
497  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
498  transformations;
499  ThriftConnectionType conn_type;
500 
501  namespace po = boost::program_options;
502 
503  po::options_description desc("Options");
504  desc.add_options()("help,h", "Print help messages ");
505  desc.add_options()(
506  "table", po::value<std::string>(&table_name)->required(), "Table Name");
507  desc.add_options()(
508  "database", po::value<std::string>(&db_name)->required(), "Database Name");
509  desc.add_options()(
510  "user,u", po::value<std::string>(&user_name)->required(), "User Name");
511  desc.add_options()(
512  "passwd,p", po::value<std::string>(&passwd)->required(), "User Password");
513  desc.add_options()("host",
514  po::value<std::string>(&server_host)->default_value(server_host),
515  "HeavyDB Server Hostname");
516  desc.add_options()(
517  "port", po::value<int>(&port)->default_value(port), "HeavyDB Server Port Number");
518  desc.add_options()("http",
519  po::bool_switch(&http)->default_value(http)->implicit_value(true),
520  "Use HTTP transport");
521  desc.add_options()("https",
522  po::bool_switch(&https)->default_value(https)->implicit_value(true),
523  "Use HTTPS transport");
524  desc.add_options()("skip-verify",
525  po::bool_switch(&skip_host_verify)
526  ->default_value(skip_host_verify)
527  ->implicit_value(true),
528  "Don't verify SSL certificate validity");
529  desc.add_options()(
530  "ca-cert",
531  po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
532  "Path to trusted server certificate. Initiates an encrypted connection");
533  desc.add_options()("delim",
534  po::value<std::string>(&delim_str)->default_value(delim_str),
535  "Field delimiter");
536  desc.add_options()("null", po::value<std::string>(&nulls), "NULL string");
537  desc.add_options()("line", po::value<std::string>(&line_delim_str), "Line delimiter");
538  desc.add_options()(
539  "quoted",
540  po::value<std::string>(&quoted),
541  "Whether the source contains quoted fields (true/false, default false)");
542  desc.add_options()("batch",
543  po::value<size_t>(&batch_size)->default_value(batch_size),
544  "Insert batch size");
545  desc.add_options()("retry_count",
546  po::value<size_t>(&retry_count)->default_value(retry_count),
547  "Number of time to retry an insert");
548  desc.add_options()("retry_wait",
549  po::value<size_t>(&retry_wait)->default_value(retry_wait),
550  "wait in secs between retries");
551  desc.add_options()("transform,t",
552  po::value<std::vector<std::string>>(&xforms)->multitoken(),
553  "Column Transformations");
554  desc.add_options()("print_error", "Print Error Rows");
555  desc.add_options()("print_transform", "Print Transformations");
556  desc.add_options()("topic",
557  po::value<std::string>(&topic)->required(),
558  "Kafka topic to consume from ");
559  desc.add_options()("group-id",
560  po::value<std::string>(&group_id)->required(),
561  "Group id this consumer is part of");
562  desc.add_options()("brokers",
563  po::value<std::string>(&brokers)->required(),
564  "list of kafka brokers for topic");
565 
566  po::positional_options_description positionalOptions;
567  positionalOptions.add("table", 1);
568  positionalOptions.add("database", 1);
569 
570  logger::LogOptions log_options(argv[0]);
571  log_options.max_files_ = 0; // stderr only
572  desc.add(log_options.get_options());
573 
574  po::variables_map vm;
575 
576  try {
577  po::store(po::command_line_parser(argc, argv)
578  .options(desc)
579  .positional(positionalOptions)
580  .run(),
581  vm);
582  if (vm.count("help")) {
583  std::cout << "Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
584  "<password> [{--host} "
585  "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
586  "string>][--line <line "
587  "delimiter>][--batch <batch size>][{-t|--transform} transformation "
588  "[--quoted <true|false>] "
589  "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
590  "secs>][--print_error][--print_transform]\n\n";
591  std::cout << desc << std::endl;
592  return 0;
593  }
594  if (vm.count("print_error")) {
595  print_error_data = true;
596  }
597  if (vm.count("print_transform")) {
598  print_transformation = true;
599  }
600 
601  po::notify(vm);
602  } catch (boost::program_options::error& e) {
603  std::cerr << "Usage Error: " << e.what() << std::endl;
604  return 1;
605  }
606 
607  logger::init(log_options);
608 
609  if (http) {
610  conn_type = ThriftConnectionType::HTTP;
611  } else if (https) {
612  conn_type = ThriftConnectionType::HTTPS;
613  } else if (!ca_cert_name.empty()) {
615  } else {
616  conn_type = ThriftConnectionType::BINARY;
617  }
618 
619  char delim = delim_str[0];
620  if (delim == '\\') {
621  if (delim_str.size() < 2 ||
622  (delim_str[1] != 'x' && delim_str[1] != 't' && delim_str[1] != 'n')) {
623  std::cerr << "Incorrect delimiter string: " << delim_str << std::endl;
624  return 1;
625  }
626  if (delim_str[1] == 't') {
627  delim = '\t';
628  } else if (delim_str[1] == 'n') {
629  delim = '\n';
630  } else {
631  std::string d(delim_str);
632  d[0] = '0';
633  delim = (char)std::stoi(d, nullptr, 16);
634  }
635  }
636  if (isprint(delim)) {
637  std::cout << "Field Delimiter: " << delim << std::endl;
638  } else if (delim == '\t') {
639  std::cout << "Field Delimiter: "
640  << "\\t" << std::endl;
641  } else if (delim == '\n') {
642  std::cout << "Field Delimiter: "
643  << "\\n"
644  << std::endl;
645  } else {
646  std::cout << "Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
647  }
648  char line_delim = line_delim_str[0];
649  if (line_delim == '\\') {
650  if (line_delim_str.size() < 2 ||
651  (line_delim_str[1] != 'x' && line_delim_str[1] != 't' &&
652  line_delim_str[1] != 'n')) {
653  std::cerr << "Incorrect delimiter string: " << line_delim_str << std::endl;
654  return 1;
655  }
656  if (line_delim_str[1] == 't') {
657  line_delim = '\t';
658  } else if (line_delim_str[1] == 'n') {
659  line_delim = '\n';
660  } else {
661  std::string d(line_delim_str);
662  d[0] = '0';
663  line_delim = (char)std::stoi(d, nullptr, 16);
664  }
665  }
666  if (isprint(line_delim)) {
667  std::cout << "Line Delimiter: " << line_delim << std::endl;
668  } else if (line_delim == '\t') {
669  std::cout << "Line Delimiter: "
670  << "\\t" << std::endl;
671  } else if (line_delim == '\n') {
672  std::cout << "Line Delimiter: "
673  << "\\n"
674  << std::endl;
675  } else {
676  std::cout << "Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
677  }
678  std::cout << "Null String: " << nulls << std::endl;
679  std::cout << "Insert Batch Size: " << std::dec << batch_size << std::endl;
680 
681  if (quoted == "true") {
682  remove_quotes = true;
683  }
684 
685  for (auto& t : xforms) {
686  auto n = t.find_first_of(':');
687  if (n == std::string::npos) {
688  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
689  << std::endl;
690  return 1;
691  }
692  std::string col_name = t.substr(0, n);
693  if (t.size() < n + 3 || t[n + 1] != 's' || t[n + 2] != '/') {
694  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
695  << std::endl;
696  return 1;
697  }
698  auto n1 = n + 3;
699  auto n2 = t.find_first_of('/', n1);
700  if (n2 == std::string::npos) {
701  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
702  << std::endl;
703  return 1;
704  }
705  std::string regex_str = t.substr(n1, n2 - n1);
706  n1 = n2 + 1;
707  n2 = t.find_first_of('/', n1);
708  if (n2 == std::string::npos) {
709  std::cerr << "Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
710  << std::endl;
711  return 1;
712  }
713  std::string fmt_str = t.substr(n1, n2 - n1);
714  std::cout << "transform " << col_name << ": s/" << regex_str << "/" << fmt_str << "/"
715  << std::endl;
716  transformations[col_name] =
717  std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
718  std::unique_ptr<boost::regex>(new boost::regex(regex_str)),
719  std::unique_ptr<std::string>(new std::string(fmt_str)));
720  }
721 
722  import_export::CopyParams copy_params(
723  delim, nulls, line_delim, batch_size, retry_count, retry_wait);
724  RowToColumnLoader row_loader(
726  server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
727  user_name,
728  passwd,
729  db_name,
730  table_name);
731 
732  kafka_insert(
733  row_loader, transformations, copy_params, remove_quotes, group_id, topic, brokers);
734  return 0;
735 }
ThriftConnectionType
Definition: ThriftClient.h:29
bool print_transformation
bool print_error_data
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
void kafka_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams &copy_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
static bool run
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

bool msg_consume ( RdKafka::Message *  message,
RowToColumnLoader row_loader,
import_export::CopyParams  copy_params,
const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &  transformations,
const bool  remove_quotes 
)

Definition at line 84 of file KafkaImporter.cpp.

References RowToColumnLoader::convert_string_to_column(), import_export::CopyParams::delimiter, eof_cnt, logger::ERROR, exit_eof, field(), RowToColumnLoader::get_row_descriptor(), import_export::CopyParams::line_delim, LOG, MAX_FIELD_LEN, msg_bytes, msg_cnt, import_export::CopyParams::null_str, partition_cnt, print_error_data, RowToColumnLoader::print_row_with_delim(), print_transformation, run, and VLOG.

Referenced by kafka_insert().

90  {
91  switch (message->err()) {
92  case RdKafka::ERR__TIMED_OUT:
93  VLOG(1) << " Timed out";
94  break;
95 
96  case RdKafka::ERR_NO_ERROR: { /* Real message */
97  msg_cnt++;
98  msg_bytes += message->len();
99  VLOG(1) << "Read msg at offset " << message->offset();
100  RdKafka::MessageTimestamp ts;
101  ts = message->timestamp();
102  if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
103  std::string tsname = "?";
104  if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
105  tsname = "create time";
106  } else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
107  tsname = "log append time";
108  }
109  VLOG(1) << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
110  }
111 
112  std::vector<char> buffer(message->len() + 2);
113  sprintf(buffer.data(),
114  "%.*s\n",
115  static_cast<int>(message->len()),
116  static_cast<const char*>(message->payload()));
117  VLOG(1) << "Full Message received is :'" << buffer.data() << "'";
118 
119  char field[MAX_FIELD_LEN];
120  size_t field_i = 0;
121 
122  bool backEscape = false;
123 
124  auto row_desc = row_loader.get_row_descriptor();
125 
126  std::vector<
127  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
128  xforms(row_desc.size());
129  for (size_t i = 0; i < row_desc.size(); i++) {
130  auto it = transformations.find(row_desc[i].col_name);
131  if (it != transformations.end()) {
132  xforms[i] = &(it->second);
133  } else {
134  xforms[i] = nullptr;
135  }
136  }
137 
138  std::vector<TStringValue>
139  row; // used to store each row as we move through the stream
140 
141  for (auto iit : buffer) {
142  if (iit == copy_params.delimiter || iit == copy_params.line_delim) {
143  bool end_of_field = (iit == copy_params.delimiter);
144  bool end_of_row;
145  if (end_of_field) {
146  end_of_row = false;
147  } else {
148  end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
149  (row.size() == row_desc.size() - 1);
150  if (!end_of_row) {
151  size_t l = copy_params.null_str.size();
152  if (field_i >= l &&
153  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
154  end_of_row = true;
155  }
156  }
157  }
158  if (!end_of_field && !end_of_row) {
159  // not enough columns yet and it is a string column
160  // treat the line delimiter as part of the string
161  field[field_i++] = iit;
162  } else {
163  field[field_i] = '\0';
164  field_i = 0;
165  TStringValue ts;
166  ts.str_val = std::string(field);
167  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
168  auto xform = row.size() < row_desc.size() ? xforms[row.size()] : nullptr;
169  if (!ts.is_null && xform != nullptr) {
170  if (print_transformation) {
171  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
172  }
173  ts.str_val =
174  boost::regex_replace(ts.str_val, *xform->first, *xform->second);
175  if (ts.str_val.empty()) {
176  ts.is_null = true;
177  }
178  if (print_transformation) {
179  std::cout << ts.str_val << std::endl;
180  }
181  }
182 
183  row.push_back(ts); // add column value to row
184  if (end_of_row || (row.size() > row_desc.size())) {
185  break; // found row
186  }
187  }
188  } else {
189  if (iit == '\\') {
190  backEscape = true;
191  } else if (backEscape || !remove_quotes || iit != '\"') {
192  field[field_i++] = iit;
193  backEscape = false;
194  }
195  // else if unescaped double-quote, continue without adding the
196  // character to the field string.
197  }
198  if (field_i >= MAX_FIELD_LEN) {
199  field[MAX_FIELD_LEN - 1] = '\0';
200  std::cerr << "String too long for buffer." << std::endl;
201  if (print_error_data) {
202  std::cerr << field << std::endl;
203  }
204  field_i = 0;
205  break;
206  }
207  }
208  if (row.size() == row_desc.size()) {
209  // add the new data in the column format
210  bool record_loaded = row_loader.convert_string_to_column(row, copy_params);
211  if (!record_loaded) {
212  // record could not be parsed correctly consider it skipped
213  return false;
214  } else {
215  return true;
216  }
217  } else {
218  if (print_error_data) {
219  std::cerr << "Incorrect number of columns for row: ";
220  std::cerr << row_loader.print_row_with_delim(row, copy_params) << std::endl;
221  return false;
222  }
223  }
224  }
225 
226  case RdKafka::ERR__PARTITION_EOF:
227  /* Last message */
228  if (exit_eof && ++eof_cnt == partition_cnt) {
229  LOG(ERROR) << "%% EOF reached for all " << partition_cnt << " partition(s)";
230  run = false;
231  }
232  break;
233 
234  case RdKafka::ERR__UNKNOWN_TOPIC:
235  case RdKafka::ERR__UNKNOWN_PARTITION:
236  LOG(ERROR) << "Consume failed: " << message->errstr() << std::endl;
237  run = false;
238  break;
239 
240  default:
241  /* Errors */
242  LOG(ERROR) << "Consume failed: " << message->errstr();
243  run = false;
244  }
245  return false;
246 };
static int eof_cnt
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
#define LOG(tag)
Definition: Logger.h:285
static long msg_cnt
static int partition_cnt
#define MAX_FIELD_LEN
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
bool print_error_data
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
static bool exit_eof
static bool run
static int64_t msg_bytes
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Variable Documentation

int eof_cnt = 0
static

Definition at line 51 of file KafkaImporter.cpp.

Referenced by msg_consume(), and RebalanceCb::rebalance_cb().

bool exit_eof = false
static

Definition at line 50 of file KafkaImporter.cpp.

Referenced by msg_consume().

int64_t msg_bytes = 0
static

Definition at line 54 of file KafkaImporter.cpp.

Referenced by kafka_insert(), and msg_consume().

long msg_cnt = 0
static

Definition at line 53 of file KafkaImporter.cpp.

Referenced by kafka_insert(), and msg_consume().

int partition_cnt = 0
static

Definition at line 52 of file KafkaImporter.cpp.

Referenced by msg_consume(), and RebalanceCb::rebalance_cb().

bool print_error_data = false

Definition at line 46 of file KafkaImporter.cpp.

Referenced by main(), msg_consume(), and stream_insert().

bool print_transformation = false

Definition at line 47 of file KafkaImporter.cpp.

Referenced by main(), msg_consume(), and stream_insert().