30 using namespace ::apache::thrift;
33 switch (ct.col_type.type) {
34 case TDatumType::BIGINT:
36 case TDatumType::BOOL:
38 case TDatumType::DATE:
40 case TDatumType::DECIMAL:
42 case TDatumType::DOUBLE:
44 case TDatumType::FLOAT:
50 if (ct.col_type.precision == 0) {
55 case TDatumType::TIME:
57 case TDatumType::TIMESTAMP:
59 case TDatumType::SMALLINT:
61 case TDatumType::TINYINT:
63 case TDatumType::POINT:
65 case TDatumType::LINESTRING:
67 case TDatumType::POLYGON:
69 case TDatumType::MULTIPOLYGON:
72 LOG(
FATAL) <<
"Unsupported TColumnType found, should not be possible";
78 if (ct.col_type.is_array) {
80 ct.col_type.precision,
92 ct.col_type.precision,
105 ct.col_type.precision,
107 ct.col_type.nullable,
114 std::vector<TStringValue> row,
116 std::ostringstream out;
118 for (TStringValue ts : row) {
132 std::vector<SQLTypeInfo> column_type_info_vector,
133 std::vector<TColumn>& input_col_vec) {
134 for (
size_t idx = 0; idx < failed_column; idx++) {
135 switch (column_type_info_vector[idx].get_type()) {
137 input_col_vec[idx].nulls.pop_back();
138 input_col_vec[idx].data.arr_col.pop_back();
143 input_col_vec[idx].nulls.pop_back();
144 input_col_vec[idx].data.str_col.pop_back();
156 input_col_vec[idx].nulls.pop_back();
157 input_col_vec[idx].data.int_col.pop_back();
161 input_col_vec[idx].nulls.pop_back();
162 input_col_vec[idx].data.real_col.pop_back();
168 input_col_vec[idx].nulls.pop_back();
169 input_col_vec[idx].data.str_col.pop_back();
172 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
182 switch (column_type_info.
get_type()) {
184 LOG(
FATAL) <<
"Trying to process ARRAY at item level something is wrong";
194 input_col.nulls.push_back(
true);
195 input_col.data.str_col.emplace_back(
"");
198 input_col.nulls.push_back(
false);
199 switch (column_type_info.
get_type()) {
202 input_col.data.str_col.push_back(
210 input_col.data.str_col.push_back(ts.str_val);
213 LOG(
FATAL) <<
" trying to process a STRING transport type not handled "
229 input_col.nulls.push_back(
true);
230 input_col.data.int_col.push_back(0);
232 input_col.nulls.push_back(
false);
234 switch (column_type_info.
get_type()) {
237 input_col.data.int_col.push_back(d.
intval);
242 input_col.data.int_col.push_back(d.
bigintval);
248 input_col.data.int_col.push_back(d.
tinyintval);
253 input_col.data.int_col.push_back(d.
bigintval);
256 LOG(
FATAL) <<
" trying to process an INT transport type not handled "
264 input_col.nulls.push_back(
true);
265 input_col.data.real_col.push_back(0);
268 input_col.nulls.push_back(
false);
270 switch (column_type_info.
get_type()) {
272 input_col.data.real_col.push_back(d.
floatval);
275 input_col.data.real_col.push_back(d.
doubleval);
278 LOG(
FATAL) <<
" trying to process a REAL transport type not handled "
284 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
293 std::vector<TStringValue> row,
297 for (TStringValue ts : row) {
301 std::vector<std::string> arr_ele;
303 ts.str_val, copy_params, arr_ele);
305 for (std::string item : arr_ele) {
306 boost::algorithm::trim(item);
309 tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.
null_str);
323 }
catch (
const std::exception& e) {
326 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
327 <<
". Row discarded, issue at column : " << (curr_col + 1)
337 const std::string& user_name,
338 const std::string& passwd,
339 const std::string& db_name,
340 const std::string& table_name)
341 : user_name_(user_name)
344 , table_name_(table_name)
345 , conn_details_(conn_details) {
348 TTableDetails table_details;
360 for (TColumnType ct : row_desc_) {
365 for (TColumnType column : row_desc_) {
379 }
catch (TOmniSciException& e) {
380 std::cerr << e.error_msg << std::endl;
381 }
catch (TException& te) {
382 std::cerr <<
"Thrift error on connect: " << te.what() << std::endl;
389 }
catch (TOmniSciException& e) {
390 std::cerr << e.error_msg << std::endl;
391 }
catch (TException& te) {
392 std::cerr <<
"Thrift error on close: " << te.what() << std::endl;
399 std::cout <<
" Waiting " << copy_params.
retry_wait
400 <<
" secs to retry Inserts , will try " << (copy_params.
retry_count - tries)
401 <<
" times more " << std::endl;
411 for (
size_t tries = 0; tries < copy_params.
retry_count;
417 std::cout << nrows <<
" Rows Inserted, " << nskipped <<
" rows skipped."
427 }
catch (TOmniSciException& e) {
428 std::cerr <<
"Exception trying to insert data " << e.error_msg << std::endl;
430 }
catch (TException& te) {
431 std::cerr <<
"Exception trying to insert data " << te.what() << std::endl;
435 std::cerr <<
"Retries exhausted program terminated" << std::endl;
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
void wait_disconnet_reconnnect_retry(size_t tries, import_export::CopyParams copy_params)
std::vector< SQLTypeInfo > column_type_info_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams ©_params)
std::vector< SQLTypeInfo > array_column_type_info_
mapd::shared_ptr< TProtocol > get_protocol()
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
int get_precision() const
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.
mapd::shared_ptr< OmniSciClient > client_