33 using namespace ::apache::thrift;
36 switch (ct.col_type.type) {
37 case TDatumType::BIGINT:
39 case TDatumType::BOOL:
41 case TDatumType::DATE:
43 case TDatumType::DECIMAL:
45 case TDatumType::DOUBLE:
47 case TDatumType::FLOAT:
53 if (ct.col_type.precision == 0) {
58 case TDatumType::TIME:
60 case TDatumType::TIMESTAMP:
62 case TDatumType::SMALLINT:
64 case TDatumType::TINYINT:
66 case TDatumType::POINT:
68 case TDatumType::LINESTRING:
70 case TDatumType::POLYGON:
72 case TDatumType::MULTIPOLYGON:
75 LOG(
FATAL) <<
"Unsupported TColumnType found, should not be possible";
81 if (ct.col_type.is_array) {
83 ct.col_type.precision,
95 ct.col_type.precision,
108 ct.col_type.precision,
110 ct.col_type.nullable,
117 std::vector<TStringValue> row,
119 std::ostringstream out;
121 for (TStringValue ts : row) {
135 std::vector<SQLTypeInfo> column_type_info_vector,
136 std::vector<TColumn>& input_col_vec) {
137 for (
size_t idx = 0; idx < failed_column; idx++) {
138 switch (column_type_info_vector[idx].get_type()) {
140 input_col_vec[idx].nulls.pop_back();
141 input_col_vec[idx].data.arr_col.pop_back();
146 input_col_vec[idx].nulls.pop_back();
147 input_col_vec[idx].data.str_col.pop_back();
159 input_col_vec[idx].nulls.pop_back();
160 input_col_vec[idx].data.int_col.pop_back();
164 input_col_vec[idx].nulls.pop_back();
165 input_col_vec[idx].data.real_col.pop_back();
171 input_col_vec[idx].nulls.pop_back();
172 input_col_vec[idx].data.str_col.pop_back();
175 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
185 switch (column_type_info.
get_type()) {
187 LOG(
FATAL) <<
"Trying to process ARRAY at item level something is wrong";
197 input_col.nulls.push_back(
true);
198 input_col.data.str_col.emplace_back(
"");
201 input_col.nulls.push_back(
false);
202 switch (column_type_info.
get_type()) {
205 input_col.data.str_col.push_back(
213 input_col.data.str_col.push_back(ts.str_val);
216 LOG(
FATAL) <<
" trying to process a STRING transport type not handled "
232 input_col.nulls.push_back(
true);
233 input_col.data.int_col.push_back(0);
235 input_col.nulls.push_back(
false);
237 switch (column_type_info.
get_type()) {
240 input_col.data.int_col.push_back(d.
intval);
245 input_col.data.int_col.push_back(d.
bigintval);
251 input_col.data.int_col.push_back(d.
tinyintval);
256 input_col.data.int_col.push_back(d.
bigintval);
259 LOG(
FATAL) <<
" trying to process an INT transport type not handled "
267 input_col.nulls.push_back(
true);
268 input_col.data.real_col.push_back(0);
271 input_col.nulls.push_back(
false);
273 switch (column_type_info.
get_type()) {
275 input_col.data.real_col.push_back(d.
floatval);
278 input_col.data.real_col.push_back(d.
doubleval);
281 LOG(
FATAL) <<
" trying to process a REAL transport type not handled "
287 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
296 std::vector<TStringValue> row,
299 uint64_t curr_col = 0;
300 for (TStringValue ts : row) {
304 std::vector<std::string> arr_ele;
306 ts.str_val, copy_params, arr_ele);
308 for (std::string item : arr_ele) {
309 boost::algorithm::trim(item);
312 tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.
null_str);
326 }
catch (
const std::exception& e) {
329 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
330 <<
". Row discarded, issue at column : " << (curr_col + 1)
340 const std::string& user_name,
341 const std::string& passwd,
342 const std::string& db_name,
343 const std::string& table_name)
344 : user_name_(user_name)
347 , table_name_(table_name)
348 , conn_details_(conn_details) {
351 TTableDetails table_details;
363 for (TColumnType ct : row_desc_) {
368 for (TColumnType column : row_desc_) {
382 }
catch (TDBException& e) {
383 std::cerr << e.error_msg << std::endl;
384 }
catch (TException& te) {
385 std::cerr <<
"Thrift error on connect: " << te.what() << std::endl;
392 }
catch (TDBException& e) {
393 std::cerr << e.error_msg << std::endl;
394 }
catch (TException& te) {
395 std::cerr <<
"Thrift error on close: " << te.what() << std::endl;
402 std::cout <<
" Waiting " << copy_params.
retry_wait
403 <<
" secs to retry Inserts , will try " << (copy_params.
retry_count - tries)
404 <<
" times more " << std::endl;
405 std::this_thread::sleep_for(std::chrono::seconds(copy_params.
retry_wait));
414 for (
size_t tries = 0; tries < copy_params.
retry_count;
420 std::cout << nrows <<
" Rows Inserted, " << nskipped <<
" rows skipped."
430 }
catch (TDBException& e) {
431 std::cerr <<
"Exception trying to insert data " << e.error_msg << std::endl;
433 }
catch (TException& te) {
434 std::cerr <<
"Exception trying to insert data " << te.what() << std::endl;
438 std::cerr <<
"Retries exhausted program terminated" << std::endl;
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
std::vector< SQLTypeInfo > column_type_info_
std::shared_ptr< HeavyClient > client_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams ©_params)
std::vector< SQLTypeInfo > array_column_type_info_
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)
int get_precision() const
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
std::shared_ptr< TProtocol > get_protocol()
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.