OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, import_export::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnet_reconnnect_retry (size_t tries, import_export::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
mapd::shared_ptr< OmniSciClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 62 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 336 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, and table_name_.

341  : user_name_(user_name)
342  , passwd_(passwd)
343  , db_name_(db_name)
344  , table_name_(table_name)
345  , conn_details_(conn_details) {
347 
348  TTableDetails table_details;
349  client_->get_table_details(table_details, session_, table_name_);
350 
351  row_desc_ = table_details.row_desc;
352 
353  // create vector with column details
354  for (TColumnType ct : row_desc_) {
356  }
357 
358  // create vector with array column details presented as real column for easier resue
359  // of othe code
360  for (TColumnType ct : row_desc_) {
362  }
363 
364  // create vector for storage of the actual column data
365  for (TColumnType column : row_desc_) {
366  TColumn t;
367  input_columns_.push_back(t);
368  }
369 }
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
mapd::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 370 of file RowToColumnLoader.cpp.

References closeConnection().

370  {
371  closeConnection();
372 }

+ Here is the call graph for this function:

Member Function Documentation

void RowToColumnLoader::closeConnection ( )
private

Definition at line 386 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnet_reconnnect_retry(), and ~RowToColumnLoader().

386  {
387  try {
388  client_->disconnect(session_); // disconnect from omnisci_server
389  } catch (TOmniSciException& e) {
390  std::cerr << e.error_msg << std::endl;
391  } catch (TException& te) {
392  std::cerr << "Thrift error on close: " << te.what() << std::endl;
393  }
394 }
mapd::shared_ptr< OmniSciClient > client_

+ Here is the caller graph for this function:

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 292 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, import_export::CopyParams::null_str, import_export::delimited_parser::parse_string_array(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

294  {
295  // create datum and push data to column structure from row data
296  uint curr_col = 0;
297  for (TStringValue ts : row) {
298  try {
299  switch (column_type_info_[curr_col].get_type()) {
300  case SQLTypes::kARRAY: {
301  std::vector<std::string> arr_ele;
303  ts.str_val, copy_params, arr_ele);
304  TColumn array_tcol;
305  for (std::string item : arr_ele) {
306  boost::algorithm::trim(item);
307  TStringValue tsa;
308  tsa.str_val = item;
309  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
310  // now put into TColumn
312  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
313  }
314  input_columns_[curr_col].nulls.push_back(false);
315  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
316 
317  break;
318  }
319  default:
321  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
322  }
323  } catch (const std::exception& e) {
325  // import_status.rows_rejected++;
326  LOG(ERROR) << "Input exception thrown: " << e.what()
327  << ". Row discarded, issue at column : " << (curr_col + 1)
328  << " data :" << print_row_with_delim(row, copy_params);
329  return false;
330  }
331  curr_col++;
332  }
333  return true;
334 }
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:188
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 374 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnet_reconnnect_retry().

374  {
375  client_.reset(new OmniSciClient(conn_details_.get_protocol()));
376 
377  try {
379  } catch (TOmniSciException& e) {
380  std::cerr << e.error_msg << std::endl;
381  } catch (TException& te) {
382  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
383  }
384 }
std::string user_name_
mapd::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_
mapd::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
import_export::CopyParams  copy_params 
)

Definition at line 408 of file RowToColumnLoader.cpp.

References client_, input_columns_, import_export::CopyParams::retry_count, row_desc_, session_, table_name_, and wait_disconnet_reconnnect_retry().

Referenced by kafka_insert(), and stream_insert().

410  {
411  for (size_t tries = 0; tries < copy_params.retry_count;
412  tries++) { // allow for retries in case of insert failure
413  try {
414  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
415  // client->load_table(session, table_name, input_rows);
416  nrows += input_columns_[0].nulls.size();
417  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
418  << std::endl;
419  // we successfully loaded the data, lets move on
420  input_columns_.clear();
421  // create vector for storage of the actual column data
422  for (TColumnType column : row_desc_) {
423  TColumn t;
424  input_columns_.push_back(t);
425  }
426  return;
427  } catch (TOmniSciException& e) {
428  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
429  wait_disconnet_reconnnect_retry(tries, copy_params);
430  } catch (TException& te) {
431  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
432  wait_disconnet_reconnnect_retry(tries, copy_params);
433  }
434  }
435  std::cerr << "Retries exhausted program terminated" << std::endl;
436  exit(1);
437 }
void wait_disconnet_reconnnect_retry(size_t tries, import_export::CopyParams copy_params)
std::string table_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
mapd::shared_ptr< OmniSciClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 288 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

288  {
289  return row_desc_;
290 };
TRowDescriptor row_desc_

+ Here is the caller graph for this function:

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const import_export::CopyParams copy_params 
)

Definition at line 113 of file RowToColumnLoader.cpp.

References import_export::CopyParams::delimiter.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

115  {
116  std::ostringstream out;
117  bool first = true;
118  for (TStringValue ts : row) {
119  if (first) {
120  first = false;
121  } else {
122  out << copy_params.delimiter;
123  }
124  out << ts.str_val;
125  }
126  return out.str();
127 }

+ Here is the caller graph for this function:

void RowToColumnLoader::wait_disconnet_reconnnect_retry ( size_t  tries,
import_export::CopyParams  copy_params 
)
private

Definition at line 396 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), import_export::CopyParams::retry_count, and import_export::CopyParams::retry_wait.

Referenced by do_load().

398  {
399  std::cout << " Waiting " << copy_params.retry_wait
400  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
401  << " times more " << std::endl;
402  sleep(copy_params.retry_wait);
403 
404  closeConnection();
406 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 86 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

mapd::shared_ptr<OmniSciClient> RowToColumnLoader::client_
private

Definition at line 90 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 85 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

ThriftClientConnection RowToColumnLoader::conn_details_
private
std::string RowToColumnLoader::db_name_
private

Definition at line 80 of file RowToColumnLoader.h.

Referenced by createConnection().

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 84 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::passwd_
private

Definition at line 79 of file RowToColumnLoader.h.

Referenced by createConnection().

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 88 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

TSessionId RowToColumnLoader::session_
private

Definition at line 91 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::table_name_
private

Definition at line 81 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::user_name_
private

Definition at line 78 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: