25 #include <boost/algorithm/string.hpp>
26 #include <boost/algorithm/string/trim.hpp>
41 #include <boost/program_options.hpp>
43 #define MAX_FIELD_LEN 20000
52 const std::map<std::string,
53 std::pair<std::unique_ptr<boost::regex>,
54 std::unique_ptr<std::string>>>& transformations,
56 const bool remove_quotes) {
57 std::ios_base::sync_with_stdio(
false);
58 std::istream_iterator<char> eos;
59 std::cin >> std::noskipws;
60 std::istream_iterator<char> iit(std::cin);
67 bool backEscape =
false;
72 const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
73 xforms(row_desc.size());
74 for (
size_t i = 0; i < row_desc.size(); i++) {
75 auto it = transformations.find(row_desc[i].col_name);
76 if (it != transformations.end()) {
77 xforms[i] = &(it->second);
83 std::vector<TStringValue> row;
90 bool end_of_field = (*iit == copy_params.
delimiter);
95 end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
96 (row.size() == row_desc.size() - 1);
98 size_t l = copy_params.
null_str.size();
100 strncmp(field + field_i - l, copy_params.
null_str.c_str(), l) == 0) {
105 if (!end_of_field && !end_of_row) {
108 field[field_i++] = *iit;
110 field[field_i] =
'\0';
113 ts.str_val = std::string(field);
114 ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.
null_str);
115 auto xform = row.size() < row_desc.size() ? xforms[row.size()] :
nullptr;
116 if (!ts.is_null && xform !=
nullptr) {
118 std::cout <<
"\ntransforming\n" << ts.str_val <<
"\nto\n";
120 ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
121 if (ts.str_val.empty()) {
125 std::cout << ts.str_val << std::endl;
130 if (end_of_row || (row.size() > row_desc.size())) {
137 }
else if (backEscape || !remove_quotes || *iit !=
'\"') {
138 field[field_i++] = *iit;
146 std::cerr <<
"String too long for buffer." << std::endl;
148 std::cerr << field << std::endl;
155 if (row.size() == row_desc.size()) {
159 if (!record_loaded) {
164 if (read_rows % copy_params.
batch_size == 0) {
165 row_loader.
do_load(nrows, nskipped, copy_params);
170 std::cerr <<
"Incorrect number of columns for row: ";
173 if (row.size() > row_desc.size()) {
184 if (read_rows % copy_params.
batch_size != 0) {
185 LOG(
INFO) <<
" read_rows " << read_rows;
186 row_loader.
do_load(nrows, nskipped, copy_params);
190 int main(
int argc,
char** argv) {
191 std::string server_host(
"localhost");
195 bool skip_host_verify =
false;
196 std::string ca_cert_name{
""};
197 std::string table_name;
199 std::string user_name;
201 std::string delim_str(
","), nulls(
"\\N"), line_delim_str(
"\n"), quoted(
"false");
202 size_t batch_size = 10000;
203 size_t retry_count = 10;
204 size_t retry_wait = 5;
205 bool remove_quotes =
false;
206 std::vector<std::string> xforms;
207 std::map<std::string,
208 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
212 namespace po = boost::program_options;
214 po::options_description desc(
"Options");
215 desc.add_options()(
"help,h",
"Print help messages ");
217 "table", po::value<std::string>(&table_name)->
required(),
"Table Name");
219 "database", po::value<std::string>(&db_name)->
required(),
"Database Name");
221 "user,u", po::value<std::string>(&user_name)->
required(),
"User Name");
223 "passwd,p", po::value<std::string>(&passwd)->
required(),
"User Password");
224 desc.add_options()(
"host",
225 po::value<std::string>(&server_host)->default_value(server_host),
226 "HeavyDB Server Hostname");
228 "port", po::value<int>(&port)->default_value(port),
"HeavyDB Server Port Number");
229 desc.add_options()(
"http",
230 po::bool_switch(&http)->default_value(http)->implicit_value(
true),
231 "Use HTTP transport");
232 desc.add_options()(
"https",
233 po::bool_switch(&https)->default_value(https)->implicit_value(
true),
234 "Use HTTPS transport");
235 desc.add_options()(
"skip-verify",
236 po::bool_switch(&skip_host_verify)
237 ->default_value(skip_host_verify)
238 ->implicit_value(
true),
239 "Don't verify SSL certificate validity");
242 po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
243 "Path to trusted server certificate. Initiates an encrypted connection");
244 desc.add_options()(
"delim",
245 po::value<std::string>(&delim_str)->default_value(delim_str),
247 desc.add_options()(
"null", po::value<std::string>(&nulls),
"NULL string");
248 desc.add_options()(
"line", po::value<std::string>(&line_delim_str),
"Line delimiter");
251 po::value<std::string>("ed),
252 "Whether the source contains quoted fields (true/false, default false)");
253 desc.add_options()(
"batch",
254 po::value<size_t>(&batch_size)->default_value(batch_size),
255 "Insert batch size");
256 desc.add_options()(
"retry_count",
257 po::value<size_t>(&retry_count)->default_value(retry_count),
258 "Number of time to retry an insert");
259 desc.add_options()(
"retry_wait",
260 po::value<size_t>(&retry_wait)->default_value(retry_wait),
261 "wait in secs between retries");
262 desc.add_options()(
"transform,t",
263 po::value<std::vector<std::string>>(&xforms)->multitoken(),
264 "Column Transformations");
265 desc.add_options()(
"print_error",
"Print Error Rows");
266 desc.add_options()(
"print_transform",
"Print Transformations");
268 po::positional_options_description positionalOptions;
269 positionalOptions.add(
"table", 1);
270 positionalOptions.add(
"database", 1);
276 po::variables_map vm;
279 po::store(po::command_line_parser(argc, argv)
281 .positional(positionalOptions)
284 if (vm.count(
"help")) {
285 std::cout <<
"Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
286 "<password> [{--host} "
287 "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
288 "string>][--line <line "
289 "delimiter>][--batch <batch size>][{-t|--transform} transformation "
290 "[--quoted <true|false>] "
291 "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
292 "secs>][--print_error][--print_transform]\n\n";
293 std::cout << desc << std::endl;
296 if (vm.count(
"print_error")) {
299 if (vm.count(
"print_transform")) {
304 }
catch (boost::program_options::error& e) {
305 std::cerr <<
"Usage Error: " << e.what() << std::endl;
315 }
else if (!ca_cert_name.empty()) {
321 char delim = delim_str[0];
323 if (delim_str.size() < 2 ||
324 (delim_str[1] !=
'x' && delim_str[1] !=
't' && delim_str[1] !=
'n')) {
325 std::cerr <<
"Incorrect delimiter string: " << delim_str << std::endl;
328 if (delim_str[1] ==
't') {
330 }
else if (delim_str[1] ==
'n') {
333 std::string d(delim_str);
335 delim = (char)std::stoi(d,
nullptr, 16);
338 if (isprint(delim)) {
339 std::cout <<
"Field Delimiter: " << delim << std::endl;
340 }
else if (delim ==
'\t') {
341 std::cout <<
"Field Delimiter: "
342 <<
"\\t" << std::endl;
343 }
else if (delim ==
'\n') {
344 std::cout <<
"Field Delimiter: "
348 std::cout <<
"Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
350 char line_delim = line_delim_str[0];
351 if (line_delim ==
'\\') {
352 if (line_delim_str.size() < 2 ||
353 (line_delim_str[1] !=
'x' && line_delim_str[1] !=
't' &&
354 line_delim_str[1] !=
'n')) {
355 std::cerr <<
"Incorrect delimiter string: " << line_delim_str << std::endl;
358 if (line_delim_str[1] ==
't') {
360 }
else if (line_delim_str[1] ==
'n') {
363 std::string d(line_delim_str);
365 line_delim = (char)std::stoi(d,
nullptr, 16);
368 if (isprint(line_delim)) {
369 std::cout <<
"Line Delimiter: " << line_delim << std::endl;
370 }
else if (line_delim ==
'\t') {
371 std::cout <<
"Line Delimiter: "
372 <<
"\\t" << std::endl;
373 }
else if (line_delim ==
'\n') {
374 std::cout <<
"Line Delimiter: "
378 std::cout <<
"Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
380 std::cout <<
"Null String: " << nulls << std::endl;
381 std::cout <<
"Insert Batch Size: " << std::dec << batch_size << std::endl;
383 if (quoted ==
"true") {
384 remove_quotes =
true;
387 for (
auto& t : xforms) {
388 auto n = t.find_first_of(
':');
389 if (
n == std::string::npos) {
390 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
394 std::string col_name = t.substr(0,
n);
395 if (t.size() <
n + 3 || t[
n + 1] !=
's' || t[
n + 2] !=
'/') {
396 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
401 auto n2 = t.find_first_of(
'/', n1);
402 if (n2 == std::string::npos) {
403 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
407 std::string regex_str = t.substr(n1, n2 - n1);
409 n2 = t.find_first_of(
'/', n1);
410 if (n2 == std::string::npos) {
411 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
415 std::string fmt_str = t.substr(n1, n2 - n1);
416 std::cout <<
"transform " << col_name <<
": s/" << regex_str <<
"/" << fmt_str <<
"/"
418 transformations[col_name] =
419 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
420 std::unique_ptr<boost::regex>(
new boost::regex(regex_str)),
421 std::unique_ptr<std::string>(
new std::string(fmt_str)));
425 delim, nulls, line_delim, batch_size, retry_count, retry_wait);
428 server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
434 stream_insert(row_loader, transformations, copy_params, remove_quotes);
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
Constants for Builtin SQL Types supported by OmniSci.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
void init(LogOptions const &log_opts)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams ©_params, const bool remove_quotes)
boost::program_options::options_description const & get_options() const