25 #include <boost/algorithm/string.hpp>
26 #include <boost/algorithm/string/trim.hpp>
27 #include <boost/regex.hpp>
41 #include <boost/program_options.hpp>
43 #include <librdkafka/rdkafkacpp.h>
45 #define MAX_FIELD_LEN 20000
50 static bool run =
true;
59 static void part_list_print(
const std::vector<RdKafka::TopicPartition*>& partitions) {
60 for (
unsigned int i = 0;
i < partitions.size();
i++) {
61 LOG(
INFO) <<
"\t" << partitions[
i]->topic() <<
"[" << partitions[
i]->partition()
68 RdKafka::ErrorCode err,
69 std::vector<RdKafka::TopicPartition*>& partitions)
override {
70 LOG(
INFO) <<
"RebalanceCb: " << RdKafka::err2str(err) <<
": ";
74 if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
75 consumer->assign(partitions);
88 const std::map<std::string,
89 std::pair<std::unique_ptr<boost::regex>,
90 std::unique_ptr<std::string>>>& transformations,
91 const bool remove_quotes) {
92 switch (message->err()) {
93 case RdKafka::ERR__TIMED_OUT:
94 VLOG(1) <<
" Timed out";
97 case RdKafka::ERR_NO_ERROR: {
100 VLOG(1) <<
"Read msg at offset " << message->offset();
101 RdKafka::MessageTimestamp ts;
102 ts = message->timestamp();
103 if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
104 std::string tsname =
"?";
105 if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
106 tsname =
"create time";
107 }
else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
108 tsname =
"log append time";
110 VLOG(1) <<
"Timestamp: " << tsname <<
" " << ts.timestamp << std::endl;
113 std::vector<char> buffer(message->len() + 1);
114 sprintf(buffer.data(),
116 static_cast<int>(message->len()),
117 static_cast<const char*>(message->payload()));
118 VLOG(1) <<
"Full Message received is :'" << buffer.data() <<
"'";
123 bool backEscape =
false;
128 const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
129 xforms(row_desc.size());
130 for (
size_t i = 0;
i < row_desc.size();
i++) {
131 auto it = transformations.find(row_desc[
i].col_name);
132 if (it != transformations.end()) {
133 xforms[
i] = &(it->second);
139 std::vector<TStringValue>
142 for (
auto iit : buffer) {
144 bool end_of_field = (iit == copy_params.
delimiter);
149 end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
150 (row.size() == row_desc.size() - 1);
152 size_t l = copy_params.
null_str.size();
154 strncmp(field + field_i - l, copy_params.
null_str.c_str(), l) == 0) {
159 if (!end_of_field && !end_of_row) {
162 field[field_i++] = iit;
164 field[field_i] =
'\0';
167 ts.str_val = std::string(field);
168 ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.
null_str);
169 auto xform = row.size() < row_desc.size() ? xforms[row.size()] :
nullptr;
170 if (!ts.is_null && xform !=
nullptr) {
172 std::cout <<
"\ntransforming\n" << ts.str_val <<
"\nto\n";
175 boost::regex_replace(ts.str_val, *xform->first, *xform->second);
176 if (ts.str_val.empty()) {
180 std::cout << ts.str_val << std::endl;
185 if (end_of_row || (row.size() > row_desc.size())) {
192 }
else if (backEscape || !remove_quotes || iit !=
'\"') {
193 field[field_i++] = iit;
201 std::cerr <<
"String too long for buffer." << std::endl;
203 std::cerr << field << std::endl;
209 if (row.size() == row_desc.size()) {
212 if (!record_loaded) {
220 std::cerr <<
"Incorrect number of columns for row: ";
227 case RdKafka::ERR__PARTITION_EOF:
235 case RdKafka::ERR__UNKNOWN_TOPIC:
236 case RdKafka::ERR__UNKNOWN_PARTITION:
237 LOG(
ERROR) <<
"Consume failed: " << message->errstr() << std::endl;
243 LOG(
ERROR) <<
"Consume failed: " << message->errstr();
251 void consume_cb(RdKafka::Message& msg,
void* opaque)
override {
260 switch (event.type()) {
261 case RdKafka::Event::EVENT_ERROR:
262 LOG(
ERROR) <<
"ERROR (" << RdKafka::err2str(event.err()) <<
"): " << event.str();
263 if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
264 LOG(
ERROR) <<
"All brokers are down, we may need special handling here";
269 case RdKafka::Event::EVENT_STATS:
270 VLOG(2) <<
"\"STATS\": " <<
event.str();
273 case RdKafka::Event::EVENT_LOG:
274 LOG(
INFO) <<
"LOG-" <<
event.severity() <<
"-" <<
event.fac().c_str() <<
":"
275 <<
event.str().c_str();
278 case RdKafka::Event::EVENT_THROTTLE:
279 LOG(
INFO) <<
"THROTTLED: " <<
event.throttle_time() <<
"ms by "
280 <<
event.broker_name() <<
" id " << (int)event.broker_id();
284 LOG(
INFO) <<
"EVENT " <<
event.type() <<
" (" << RdKafka::err2str(event.err())
285 <<
"): " << event.str();
294 const std::map<std::string,
295 std::pair<std::unique_ptr<boost::regex>,
296 std::unique_ptr<std::string>>>& transformations,
298 const bool remove_quotes,
299 std::string group_id,
301 std::string brokers) {
303 std::string topic_str;
306 std::vector<std::string> topics;
307 bool do_conf_dump =
false;
318 conf->set(
"rebalance_cb", &ex_rebalance_cb, errstr);
320 if (conf->set(
"group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
321 LOG(
FATAL) <<
"could not set group.id " << errstr;
324 if (conf->set(
"compression.codec",
"none", errstr) !=
325 RdKafka::Conf::CONF_OK) {
329 if (conf->set(
"statistics.interval.ms",
"1000", errstr) != RdKafka::Conf::CONF_OK) {
332 if (conf->set(
"enable.auto.commit",
"false", errstr) != RdKafka::Conf::CONF_OK) {
336 if (tconf->set(
"auto.offset.reset",
"earliest", errstr) != RdKafka::Conf::CONF_OK) {
340 if (tconf->set(
"enable.auto.commit",
"false", errstr) != RdKafka::Conf::CONF_OK) {
346 topics.push_back(topic);
348 LOG(
INFO) <<
"Version " << RdKafka::version_str().c_str();
350 LOG(
INFO) << RdKafka::get_debug_contexts().c_str();
352 conf->set(
"metadata.broker.list", brokers, errstr);
355 if (!debug.empty()) {
356 if (conf->set(
"debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
364 if (conf->set(
"consume_cb", &consume_cb, errstr) != RdKafka::Conf::CONF_OK) {
372 if (conf->set(
"event_cb", &ex_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
376 if (conf->set(
"default_topic_conf", tconf, errstr) != RdKafka::Conf::CONF_OK) {
383 for (pass = 0; pass < 2; pass++) {
384 std::list<std::string>* dump;
387 LOG(
INFO) <<
"# Global config";
388 LOG(
INFO) <<
"===============";
390 dump = tconf->dump();
392 LOG(
INFO) <<
"===============";
395 for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
396 std::string ts = *it;
398 LOG(
INFO) << ts <<
" = " << *it;
401 LOG(
INFO) <<
"Dump config finished";
404 LOG(
INFO) <<
"FULL Dump config finished";
413 LOG(
ERROR) <<
"Failed to create consumer: " << errstr;
418 LOG(
INFO) <<
" Created consumer " << consumer->name();
423 RdKafka::ErrorCode err = consumer->subscribe(topics);
425 LOG(
FATAL) <<
"Failed to subscribe to " << topics.size()
426 <<
" topics: " << RdKafka::err2str(err);
432 size_t recv_rows = 0;
436 RdKafka::Message* msg = consumer->consume(10000);
437 if (msg->err() == RdKafka::ERR_NO_ERROR) {
440 msg_consume(msg, row_loader, copy_params, transformations, remove_quotes);
445 row_loader.
do_load(rows_loaded, skipped, copy_params);
448 consumer->commitSync();
466 LOG(
FATAL) <<
"Consumer shut down, probably due to an error please review logs";
477 int main(
int argc,
char** argv) {
478 std::string server_host(
"localhost");
482 bool skip_host_verify =
false;
483 std::string ca_cert_name{
""};
484 std::string table_name;
486 std::string user_name;
488 std::string group_id;
491 std::string delim_str(
","), nulls(
"\\N"), line_delim_str(
"\n"), quoted(
"false");
492 size_t batch_size = 10000;
493 size_t retry_count = 10;
494 size_t retry_wait = 5;
495 bool remove_quotes =
false;
496 std::vector<std::string> xforms;
497 std::map<std::string,
498 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
502 namespace po = boost::program_options;
504 po::options_description desc(
"Options");
505 desc.add_options()(
"help,h",
"Print help messages ");
507 "table", po::value<std::string>(&table_name)->
required(),
"Table Name");
509 "database", po::value<std::string>(&db_name)->
required(),
"Database Name");
511 "user,u", po::value<std::string>(&user_name)->
required(),
"User Name");
513 "passwd,p", po::value<std::string>(&passwd)->
required(),
"User Password");
514 desc.add_options()(
"host",
515 po::value<std::string>(&server_host)->default_value(server_host),
516 "OmniSci Server Hostname");
518 "port", po::value<int>(&port)->default_value(port),
"OmniSci Server Port Number");
519 desc.add_options()(
"http",
520 po::bool_switch(&http)->default_value(http)->implicit_value(
true),
521 "Use HTTP transport");
522 desc.add_options()(
"https",
523 po::bool_switch(&https)->default_value(https)->implicit_value(
true),
524 "Use HTTPS transport");
525 desc.add_options()(
"skip-verify",
526 po::bool_switch(&skip_host_verify)
527 ->default_value(skip_host_verify)
528 ->implicit_value(
true),
529 "Don't verify SSL certificate validity");
532 po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
533 "Path to trusted server certificate. Initiates an encrypted connection");
534 desc.add_options()(
"delim",
535 po::value<std::string>(&delim_str)->default_value(delim_str),
537 desc.add_options()(
"null", po::value<std::string>(&nulls),
"NULL string");
538 desc.add_options()(
"line", po::value<std::string>(&line_delim_str),
"Line delimiter");
541 po::value<std::string>("ed),
542 "Whether the source contains quoted fields (true/false, default false)");
543 desc.add_options()(
"batch",
544 po::value<size_t>(&batch_size)->default_value(batch_size),
545 "Insert batch size");
546 desc.add_options()(
"retry_count",
547 po::value<size_t>(&retry_count)->default_value(retry_count),
548 "Number of time to retry an insert");
549 desc.add_options()(
"retry_wait",
550 po::value<size_t>(&retry_wait)->default_value(retry_wait),
551 "wait in secs between retries");
552 desc.add_options()(
"transform,t",
553 po::value<std::vector<std::string>>(&xforms)->multitoken(),
554 "Column Transformations");
555 desc.add_options()(
"print_error",
"Print Error Rows");
556 desc.add_options()(
"print_transform",
"Print Transformations");
557 desc.add_options()(
"topic",
558 po::value<std::string>(&topic)->
required(),
559 "Kafka topic to consume from ");
560 desc.add_options()(
"group-id",
561 po::value<std::string>(&group_id)->
required(),
562 "Group id this consumer is part of");
563 desc.add_options()(
"brokers",
564 po::value<std::string>(&brokers)->
required(),
565 "list of kafka brokers for topic");
567 po::positional_options_description positionalOptions;
568 positionalOptions.add(
"table", 1);
569 positionalOptions.add(
"database", 1);
575 po::variables_map vm;
578 po::store(po::command_line_parser(argc, argv)
580 .positional(positionalOptions)
583 if (vm.count(
"help")) {
584 std::cout <<
"Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
585 "<password> [{--host} "
586 "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
587 "string>][--line <line "
588 "delimiter>][--batch <batch size>][{-t|--transform} transformation "
589 "[--quoted <true|false>] "
590 "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
591 "secs>][--print_error][--print_transform]\n\n";
592 std::cout << desc << std::endl;
595 if (vm.count(
"print_error")) {
598 if (vm.count(
"print_transform")) {
603 }
catch (boost::program_options::error& e) {
604 std::cerr <<
"Usage Error: " << e.what() << std::endl;
614 }
else if (!ca_cert_name.empty()) {
620 char delim = delim_str[0];
622 if (delim_str.size() < 2 ||
623 (delim_str[1] !=
'x' && delim_str[1] !=
't' && delim_str[1] !=
'n')) {
624 std::cerr <<
"Incorrect delimiter string: " << delim_str << std::endl;
627 if (delim_str[1] ==
't') {
629 }
else if (delim_str[1] ==
'n') {
632 std::string
d(delim_str);
634 delim = (char)std::stoi(d,
nullptr, 16);
637 if (isprint(delim)) {
638 std::cout <<
"Field Delimiter: " << delim << std::endl;
639 }
else if (delim ==
'\t') {
640 std::cout <<
"Field Delimiter: "
641 <<
"\\t" << std::endl;
642 }
else if (delim ==
'\n') {
643 std::cout <<
"Field Delimiter: "
647 std::cout <<
"Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
649 char line_delim = line_delim_str[0];
650 if (line_delim ==
'\\') {
651 if (line_delim_str.size() < 2 ||
652 (line_delim_str[1] !=
'x' && line_delim_str[1] !=
't' &&
653 line_delim_str[1] !=
'n')) {
654 std::cerr <<
"Incorrect delimiter string: " << line_delim_str << std::endl;
657 if (line_delim_str[1] ==
't') {
659 }
else if (line_delim_str[1] ==
'n') {
662 std::string
d(line_delim_str);
664 line_delim = (char)std::stoi(d,
nullptr, 16);
667 if (isprint(line_delim)) {
668 std::cout <<
"Line Delimiter: " << line_delim << std::endl;
669 }
else if (line_delim ==
'\t') {
670 std::cout <<
"Line Delimiter: "
671 <<
"\\t" << std::endl;
672 }
else if (line_delim ==
'\n') {
673 std::cout <<
"Line Delimiter: "
677 std::cout <<
"Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
679 std::cout <<
"Null String: " << nulls << std::endl;
680 std::cout <<
"Insert Batch Size: " << std::dec << batch_size << std::endl;
682 if (quoted ==
"true") {
683 remove_quotes =
true;
686 for (
auto&
t : xforms) {
687 auto n =
t.find_first_of(
':');
688 if (n == std::string::npos) {
689 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
693 std::string col_name =
t.substr(0, n);
694 if (
t.size() < n + 3 ||
t[n + 1] !=
's' ||
t[n + 2] !=
'/') {
695 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
700 auto n2 =
t.find_first_of(
'/', n1);
701 if (n2 == std::string::npos) {
702 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
706 std::string regex_str =
t.substr(n1, n2 - n1);
708 n2 =
t.find_first_of(
'/', n1);
709 if (n2 == std::string::npos) {
710 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
714 std::string fmt_str =
t.substr(n1, n2 - n1);
715 std::cout <<
"transform " << col_name <<
": s/" << regex_str <<
"/" << fmt_str <<
"/"
717 transformations[col_name] =
718 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
719 std::unique_ptr<boost::regex>(
new boost::regex(regex_str)),
720 std::unique_ptr<std::string>(
new std::string(fmt_str)));
724 delim, nulls, line_delim, batch_size, retry_count, retry_wait);
727 server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
734 row_loader, transformations, copy_params, remove_quotes, group_id, topic, brokers);
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void consume_cb(RdKafka::Message &msg, void *opaque) override
Constants for Builtin SQL Types supported by OmniSci.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
bool print_transformation
boost::program_options::options_description const & get_options() const
stuff(RowToColumnLoader &rl, import_export::CopyParams &cp)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
bool msg_consume(RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
void init(LogOptions const &log_opts)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void kafka_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams ©_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
RowToColumnLoader row_loader
void event_cb(RdKafka::Event &event) override
static void part_list_print(const std::vector< RdKafka::TopicPartition * > &partitions)
import_export::CopyParams copy_params
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< RdKafka::TopicPartition * > &partitions) override