OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, import_export::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnect_reconnect_retry (size_t tries, import_export::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
std::shared_ptr< HeavyClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 54 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 347 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, and table_name_.

352  : user_name_(user_name)
353  , passwd_(passwd)
354  , db_name_(db_name)
355  , table_name_(table_name)
356  , conn_details_(conn_details) {
358 
359  TTableDetails table_details;
360  client_->get_table_details(table_details, session_, table_name_);
361 
362  row_desc_ = table_details.row_desc;
363 
364  // create vector with column details
365  for (TColumnType ct : row_desc_) {
367  }
368 
369  // create vector with array column details presented as real column for easier resue
370  // of othe code
371  for (TColumnType ct : row_desc_) {
373  }
374 
375  // create vector for storage of the actual column data
376  for (TColumnType column : row_desc_) {
377  TColumn t;
378  input_columns_.push_back(t);
379  }
380 }
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
std::shared_ptr< HeavyClient > client_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 381 of file RowToColumnLoader.cpp.

References closeConnection().

381  {
382  closeConnection();
383 }

+ Here is the call graph for this function:

Member Function Documentation

void RowToColumnLoader::closeConnection ( )
private

Definition at line 397 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnect_reconnect_retry(), and ~RowToColumnLoader().

397  {
398  try {
399  client_->disconnect(session_); // disconnect from heavydb
400  } catch (TDBException& e) {
401  std::cerr << e.error_msg << std::endl;
402  } catch (TException& te) {
403  std::cerr << "Thrift error on close: " << te.what() << std::endl;
404  }
405 }
std::shared_ptr< HeavyClient > client_

+ Here is the caller graph for this function:

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 303 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, import_export::CopyParams::null_str, import_export::delimited_parser::parse_string_array(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

305  {
306  // create datum and push data to column structure from row data
307  uint64_t curr_col = 0;
308  for (TStringValue ts : row) {
309  try {
310  switch (column_type_info_[curr_col].get_type()) {
311  case SQLTypes::kARRAY: {
312  std::vector<std::string> arr_ele;
314  ts.str_val, copy_params, arr_ele);
315  TColumn array_tcol;
316  for (std::string item : arr_ele) {
317  boost::algorithm::trim(item);
318  TStringValue tsa;
319  tsa.str_val = item;
320  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
321  // now put into TColumn
323  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
324  }
325  input_columns_[curr_col].nulls.push_back(false);
326  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
327 
328  break;
329  }
330  default:
332  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
333  }
334  } catch (const std::exception& e) {
336  // import_status.rows_rejected++;
337  LOG(ERROR) << "Input exception thrown: " << e.what()
338  << ". Row discarded, issue at column : " << (curr_col + 1)
339  << " data :" << print_row_with_delim(row, copy_params);
340  return false;
341  }
342  curr_col++;
343  }
344  return true;
345 }
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:285
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 385 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnect_reconnect_retry().

385  {
386  client_.reset(new HeavyClient(conn_details_.get_protocol()));
387 
388  try {
390  } catch (TDBException& e) {
391  std::cerr << e.error_msg << std::endl;
392  } catch (TException& te) {
393  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
394  }
395 }
std::string user_name_
std::shared_ptr< HeavyClient > client_
std::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
import_export::CopyParams  copy_params 
)

Definition at line 419 of file RowToColumnLoader.cpp.

References client_, input_columns_, import_export::CopyParams::retry_count, row_desc_, session_, table_name_, and wait_disconnect_reconnect_retry().

Referenced by kafka_insert(), and stream_insert().

421  {
422  for (size_t tries = 0; tries < copy_params.retry_count;
423  tries++) { // allow for retries in case of insert failure
424  try {
425  client_->load_table_binary_columnar(session_, table_name_, input_columns_, {});
426  // client->load_table(session, table_name, input_rows);
427  nrows += input_columns_[0].nulls.size();
428  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
429  << std::endl;
430  // we successfully loaded the data, lets move on
431  input_columns_.clear();
432  // create vector for storage of the actual column data
433  for (TColumnType column : row_desc_) {
434  TColumn t;
435  input_columns_.push_back(t);
436  }
437  return;
438  } catch (TDBException& e) {
439  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
440  exit(2);
441  } catch (TException& te) {
442  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
443  wait_disconnect_reconnect_retry(tries, copy_params);
444  }
445  }
446  std::cerr << "Retries exhausted program terminated" << std::endl;
447  exit(1);
448 }
std::string table_name_
std::shared_ptr< HeavyClient > client_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 299 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

299  {
300  return row_desc_;
301 };
TRowDescriptor row_desc_

+ Here is the caller graph for this function:

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 118 of file RowToColumnLoader.cpp.

References import_export::CopyParams::delimiter.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

120  {
121  std::ostringstream out;
122  bool first = true;
123  for (TStringValue ts : row) {
124  if (first) {
125  first = false;
126  } else {
127  out << copy_params.delimiter;
128  }
129  out << ts.str_val;
130  }
131  return out.str();
132 }

+ Here is the caller graph for this function:

void RowToColumnLoader::wait_disconnect_reconnect_retry ( size_t  tries,
import_export::CopyParams  copy_params 
)
private

Definition at line 407 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), import_export::CopyParams::retry_count, and import_export::CopyParams::retry_wait.

Referenced by do_load().

409  {
410  std::cout << " Waiting " << copy_params.retry_wait
411  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
412  << " times more " << std::endl;
413  std::this_thread::sleep_for(std::chrono::seconds(copy_params.retry_wait));
414 
415  closeConnection();
417 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 78 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

std::shared_ptr<HeavyClient> RowToColumnLoader::client_
private

Definition at line 82 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 77 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

ThriftClientConnection RowToColumnLoader::conn_details_
private
std::string RowToColumnLoader::db_name_
private

Definition at line 72 of file RowToColumnLoader.h.

Referenced by createConnection().

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 76 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::passwd_
private

Definition at line 71 of file RowToColumnLoader.h.

Referenced by createConnection().

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 80 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

TSessionId RowToColumnLoader::session_
private

Definition at line 83 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::table_name_
private

Definition at line 73 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::user_name_
private

Definition at line 70 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: