OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, import_export::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnect_reconnect_retry (size_t tries, import_export::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
std::shared_ptr< HeavyClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 54 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 337 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, and table_name_.

342  : user_name_(user_name)
343  , passwd_(passwd)
344  , db_name_(db_name)
345  , table_name_(table_name)
346  , conn_details_(conn_details) {
348 
349  TTableDetails table_details;
350  client_->get_table_details(table_details, session_, table_name_);
351 
352  row_desc_ = table_details.row_desc;
353 
354  // create vector with column details
355  for (TColumnType ct : row_desc_) {
357  }
358 
359  // create vector with array column details presented as real column for easier resue
360  // of othe code
361  for (TColumnType ct : row_desc_) {
363  }
364 
365  // create vector for storage of the actual column data
366  for (TColumnType column : row_desc_) {
367  TColumn t;
368  input_columns_.push_back(t);
369  }
370 }
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
std::shared_ptr< HeavyClient > client_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 371 of file RowToColumnLoader.cpp.

References closeConnection().

371  {
372  closeConnection();
373 }

+ Here is the call graph for this function:

Member Function Documentation

void RowToColumnLoader::closeConnection ( )
private

Definition at line 387 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnect_reconnect_retry(), and ~RowToColumnLoader().

387  {
388  try {
389  client_->disconnect(session_); // disconnect from heavydb
390  } catch (TDBException& e) {
391  std::cerr << e.error_msg << std::endl;
392  } catch (TException& te) {
393  std::cerr << "Thrift error on close: " << te.what() << std::endl;
394  }
395 }
std::shared_ptr< HeavyClient > client_

+ Here is the caller graph for this function:

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 293 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, import_export::CopyParams::null_str, import_export::delimited_parser::parse_string_array(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

295  {
296  // create datum and push data to column structure from row data
297  uint64_t curr_col = 0;
298  for (TStringValue ts : row) {
299  try {
300  switch (column_type_info_[curr_col].get_type()) {
301  case SQLTypes::kARRAY: {
302  std::vector<std::string> arr_ele;
304  ts.str_val, copy_params, arr_ele);
305  TColumn array_tcol;
306  for (std::string item : arr_ele) {
307  boost::algorithm::trim(item);
308  TStringValue tsa;
309  tsa.str_val = item;
310  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
311  // now put into TColumn
313  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
314  }
315  input_columns_[curr_col].nulls.push_back(false);
316  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
317 
318  break;
319  }
320  default:
322  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
323  }
324  } catch (const std::exception& e) {
326  // import_status.rows_rejected++;
327  LOG(ERROR) << "Input exception thrown: " << e.what()
328  << ". Row discarded, issue at column : " << (curr_col + 1)
329  << " data :" << print_row_with_delim(row, copy_params);
330  return false;
331  }
332  curr_col++;
333  }
334  return true;
335 }
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:216
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 375 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnect_reconnect_retry().

375  {
376  client_.reset(new HeavyClient(conn_details_.get_protocol()));
377 
378  try {
380  } catch (TDBException& e) {
381  std::cerr << e.error_msg << std::endl;
382  } catch (TException& te) {
383  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
384  }
385 }
std::string user_name_
std::shared_ptr< HeavyClient > client_
std::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
import_export::CopyParams  copy_params 
)

Definition at line 409 of file RowToColumnLoader.cpp.

References client_, input_columns_, import_export::CopyParams::retry_count, row_desc_, session_, table_name_, and wait_disconnect_reconnect_retry().

Referenced by kafka_insert(), and stream_insert().

411  {
412  for (size_t tries = 0; tries < copy_params.retry_count;
413  tries++) { // allow for retries in case of insert failure
414  try {
415  client_->load_table_binary_columnar(session_, table_name_, input_columns_, {});
416  // client->load_table(session, table_name, input_rows);
417  nrows += input_columns_[0].nulls.size();
418  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
419  << std::endl;
420  // we successfully loaded the data, lets move on
421  input_columns_.clear();
422  // create vector for storage of the actual column data
423  for (TColumnType column : row_desc_) {
424  TColumn t;
425  input_columns_.push_back(t);
426  }
427  return;
428  } catch (TDBException& e) {
429  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
430  exit(2);
431  } catch (TException& te) {
432  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
433  wait_disconnect_reconnect_retry(tries, copy_params);
434  }
435  }
436  std::cerr << "Retries exhausted program terminated" << std::endl;
437  exit(1);
438 }
std::string table_name_
std::shared_ptr< HeavyClient > client_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 289 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

289  {
290  return row_desc_;
291 };
TRowDescriptor row_desc_

+ Here is the caller graph for this function:

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 114 of file RowToColumnLoader.cpp.

References import_export::CopyParams::delimiter.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

116  {
117  std::ostringstream out;
118  bool first = true;
119  for (TStringValue ts : row) {
120  if (first) {
121  first = false;
122  } else {
123  out << copy_params.delimiter;
124  }
125  out << ts.str_val;
126  }
127  return out.str();
128 }

+ Here is the caller graph for this function:

void RowToColumnLoader::wait_disconnect_reconnect_retry ( size_t  tries,
import_export::CopyParams  copy_params 
)
private

Definition at line 397 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), import_export::CopyParams::retry_count, and import_export::CopyParams::retry_wait.

Referenced by do_load().

399  {
400  std::cout << " Waiting " << copy_params.retry_wait
401  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
402  << " times more " << std::endl;
403  std::this_thread::sleep_for(std::chrono::seconds(copy_params.retry_wait));
404 
405  closeConnection();
407 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 78 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

std::shared_ptr<HeavyClient> RowToColumnLoader::client_
private

Definition at line 82 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 77 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

ThriftClientConnection RowToColumnLoader::conn_details_
private
std::string RowToColumnLoader::db_name_
private

Definition at line 72 of file RowToColumnLoader.h.

Referenced by createConnection().

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 76 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::passwd_
private

Definition at line 71 of file RowToColumnLoader.h.

Referenced by createConnection().

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 80 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

TSessionId RowToColumnLoader::session_
private

Definition at line 83 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::table_name_
private

Definition at line 73 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::user_name_
private

Definition at line 70 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: