OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, import_export::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnect_reconnect_retry (size_t tries, import_export::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
std::shared_ptr< OmniSciClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 74 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 339 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, t, and table_name_.

344  : user_name_(user_name)
345  , passwd_(passwd)
346  , db_name_(db_name)
347  , table_name_(table_name)
348  , conn_details_(conn_details) {
350 
351  TTableDetails table_details;
352  client_->get_table_details(table_details, session_, table_name_);
353 
354  row_desc_ = table_details.row_desc;
355 
356  // create vector with column details
357  for (TColumnType ct : row_desc_) {
359  }
360 
361  // create vector with array column details presented as real column for easier resue
362  // of othe code
363  for (TColumnType ct : row_desc_) {
365  }
366 
367  // create vector for storage of the actual column data
368  for (TColumnType column : row_desc_) {
369  TColumn t;
370  input_columns_.push_back(t);
371  }
372 }
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
char * t
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
std::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 373 of file RowToColumnLoader.cpp.

References closeConnection().

373  {
374  closeConnection();
375 }

+ Here is the call graph for this function:

Member Function Documentation

void RowToColumnLoader::closeConnection ( )
private

Definition at line 389 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnect_reconnect_retry(), and ~RowToColumnLoader().

389  {
390  try {
391  client_->disconnect(session_); // disconnect from omnisci_server
392  } catch (TOmniSciException& e) {
393  std::cerr << e.error_msg << std::endl;
394  } catch (TException& te) {
395  std::cerr << "Thrift error on close: " << te.what() << std::endl;
396  }
397 }
std::shared_ptr< OmniSciClient > client_

+ Here is the caller graph for this function:

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 295 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, import_export::CopyParams::null_str, import_export::delimited_parser::parse_string_array(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

297  {
298  // create datum and push data to column structure from row data
299  uint64_t curr_col = 0;
300  for (TStringValue ts : row) {
301  try {
302  switch (column_type_info_[curr_col].get_type()) {
303  case SQLTypes::kARRAY: {
304  std::vector<std::string> arr_ele;
306  ts.str_val, copy_params, arr_ele);
307  TColumn array_tcol;
308  for (std::string item : arr_ele) {
309  boost::algorithm::trim(item);
310  TStringValue tsa;
311  tsa.str_val = item;
312  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
313  // now put into TColumn
315  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
316  }
317  input_columns_[curr_col].nulls.push_back(false);
318  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
319 
320  break;
321  }
322  default:
324  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
325  }
326  } catch (const std::exception& e) {
328  // import_status.rows_rejected++;
329  LOG(ERROR) << "Input exception thrown: " << e.what()
330  << ". Row discarded, issue at column : " << (curr_col + 1)
331  << " data :" << print_row_with_delim(row, copy_params);
332  return false;
333  }
334  curr_col++;
335  }
336  return true;
337 }
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:205
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 377 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnect_reconnect_retry().

377  {
378  client_.reset(new OmniSciClient(conn_details_.get_protocol()));
379 
380  try {
382  } catch (TOmniSciException& e) {
383  std::cerr << e.error_msg << std::endl;
384  } catch (TException& te) {
385  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
386  }
387 }
std::string user_name_
std::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_
std::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
import_export::CopyParams  copy_params 
)

Definition at line 411 of file RowToColumnLoader.cpp.

References client_, input_columns_, import_export::CopyParams::retry_count, row_desc_, session_, t, table_name_, and wait_disconnect_reconnect_retry().

Referenced by kafka_insert(), and stream_insert().

413  {
414  for (size_t tries = 0; tries < copy_params.retry_count;
415  tries++) { // allow for retries in case of insert failure
416  try {
417  client_->load_table_binary_columnar(session_, table_name_, input_columns_, {});
418  // client->load_table(session, table_name, input_rows);
419  nrows += input_columns_[0].nulls.size();
420  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
421  << std::endl;
422  // we successfully loaded the data, lets move on
423  input_columns_.clear();
424  // create vector for storage of the actual column data
425  for (TColumnType column : row_desc_) {
426  TColumn t;
427  input_columns_.push_back(t);
428  }
429  return;
430  } catch (TOmniSciException& e) {
431  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
432  exit(2);
433  } catch (TException& te) {
434  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
435  wait_disconnect_reconnect_retry(tries, copy_params);
436  }
437  }
438  std::cerr << "Retries exhausted program terminated" << std::endl;
439  exit(1);
440 }
std::string table_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)
char * t
std::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 291 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

291  {
292  return row_desc_;
293 };
TRowDescriptor row_desc_

+ Here is the caller graph for this function:

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 116 of file RowToColumnLoader.cpp.

References import_export::CopyParams::delimiter.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

118  {
119  std::ostringstream out;
120  bool first = true;
121  for (TStringValue ts : row) {
122  if (first) {
123  first = false;
124  } else {
125  out << copy_params.delimiter;
126  }
127  out << ts.str_val;
128  }
129  return out.str();
130 }

+ Here is the caller graph for this function:

void RowToColumnLoader::wait_disconnect_reconnect_retry ( size_t  tries,
import_export::CopyParams  copy_params 
)
private

Definition at line 399 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), import_export::CopyParams::retry_count, and import_export::CopyParams::retry_wait.

Referenced by do_load().

401  {
402  std::cout << " Waiting " << copy_params.retry_wait
403  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
404  << " times more " << std::endl;
405  std::this_thread::sleep_for(std::chrono::seconds(copy_params.retry_wait));
406 
407  closeConnection();
409 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 98 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

std::shared_ptr<OmniSciClient> RowToColumnLoader::client_
private

Definition at line 102 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 97 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

ThriftClientConnection RowToColumnLoader::conn_details_
private
std::string RowToColumnLoader::db_name_
private

Definition at line 92 of file RowToColumnLoader.h.

Referenced by createConnection().

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 96 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::passwd_
private

Definition at line 91 of file RowToColumnLoader.h.

Referenced by createConnection().

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 100 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

TSessionId RowToColumnLoader::session_
private

Definition at line 103 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::table_name_
private

Definition at line 93 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::user_name_
private

Definition at line 90 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: