OmniSciDB  5ade3759e0
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnet_reconnnect_retry (size_t tries, Importer_NS::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
mapd::shared_ptr< MapDClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 62 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

◆ RowToColumnLoader()

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 302 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, and table_name_.

307  : user_name_(user_name)
308  , passwd_(passwd)
309  , db_name_(db_name)
310  , table_name_(table_name)
311  , conn_details_(conn_details) {
313 
314  TTableDetails table_details;
315  client_->get_table_details(table_details, session_, table_name_);
316 
317  row_desc_ = table_details.row_desc;
318 
319  // create vector with column details
320  for (TColumnType ct : row_desc_) {
322  }
323 
324  // create vector with array column details presented as real column for easier resue
325  // of othe code
326  for (TColumnType ct : row_desc_) {
328  }
329 
330  // create vector for storage of the actual column data
331  for (TColumnType column : row_desc_) {
332  TColumn t;
333  input_columns_.push_back(t);
334  }
335 }
mapd::shared_ptr< MapDClient > client_
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
+ Here is the call graph for this function:

◆ ~RowToColumnLoader()

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 336 of file RowToColumnLoader.cpp.

References closeConnection().

336  {
337  closeConnection();
338 }
+ Here is the call graph for this function:

Member Function Documentation

◆ closeConnection()

void RowToColumnLoader::closeConnection ( )
private

Definition at line 352 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnet_reconnnect_retry(), and ~RowToColumnLoader().

352  {
353  try {
354  client_->disconnect(session_); // disconnect from omnisci_server
355  } catch (TMapDException& e) {
356  std::cerr << e.error_msg << std::endl;
357  } catch (TException& te) {
358  std::cerr << "Thrift error on close: " << te.what() << std::endl;
359  }
360 }
mapd::shared_ptr< MapDClient > client_
+ Here is the caller graph for this function:

◆ convert_string_to_column()

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const Importer_NS::CopyParams copy_params 
)

Definition at line 259 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, Importer_NS::CopyParams::null_str, Importer_NS::ImporterUtils::parseStringArray(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

261  {
262  // create datum and push data to column structure from row data
263  uint curr_col = 0;
264  for (TStringValue ts : row) {
265  try {
266  switch (column_type_info_[curr_col].get_type()) {
267  case SQLTypes::kARRAY: {
268  std::vector<std::string> arr_ele;
269  Importer_NS::ImporterUtils::parseStringArray(ts.str_val, copy_params, arr_ele);
270  TColumn array_tcol;
271  for (std::string item : arr_ele) {
272  boost::algorithm::trim(item);
273  TStringValue tsa;
274  tsa.str_val = item;
275  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
276  // now put into TColumn
278  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
279  }
280  input_columns_[curr_col].nulls.push_back(false);
281  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
282 
283  break;
284  }
285  default:
287  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
288  }
289  } catch (const std::exception& e) {
291  // import_status.rows_rejected++;
292  LOG(ERROR) << "Input exception thrown: " << e.what()
293  << ". Row discarded, issue at column : " << (curr_col + 1)
294  << " data :" << print_row_with_delim(row, copy_params);
295  return false;
296  }
297  curr_col++;
298  }
299  return true;
300 }
std::string null_str
Definition: Importer.h:100
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:182
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
static bool parseStringArray(const std::string &s, const CopyParams &copy_params, std::vector< std::string > &string_vec)
Definition: Importer.h:814
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ createConnection()

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 340 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnet_reconnnect_retry().

340  {
341  client_.reset(new MapDClient(conn_details_.get_protocol()));
342 
343  try {
345  } catch (TMapDException& e) {
346  std::cerr << e.error_msg << std::endl;
347  } catch (TException& te) {
348  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
349  }
350 }
mapd::shared_ptr< MapDClient > client_
std::string user_name_
mapd::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ do_load()

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
Importer_NS::CopyParams  copy_params 
)

Definition at line 374 of file RowToColumnLoader.cpp.

References client_, input_columns_, Importer_NS::CopyParams::retry_count, row_desc_, session_, table_name_, and wait_disconnet_reconnnect_retry().

Referenced by kafka_insert(), and stream_insert().

376  {
377  for (size_t tries = 0; tries < copy_params.retry_count;
378  tries++) { // allow for retries in case of insert failure
379  try {
380  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
381  // client->load_table(session, table_name, input_rows);
382  nrows += input_columns_[0].nulls.size();
383  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
384  << std::endl;
385  // we successfully loaded the data, lets move on
386  input_columns_.clear();
387  // create vector for storage of the actual column data
388  for (TColumnType column : row_desc_) {
389  TColumn t;
390  input_columns_.push_back(t);
391  }
392  return;
393  } catch (TMapDException& e) {
394  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
395  wait_disconnet_reconnnect_retry(tries, copy_params);
396  } catch (TException& te) {
397  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
398  wait_disconnet_reconnnect_retry(tries, copy_params);
399  }
400  }
401  std::cerr << "Retries exhausted program terminated" << std::endl;
402  exit(1);
403 }
mapd::shared_ptr< MapDClient > client_
std::string table_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
void wait_disconnet_reconnnect_retry(size_t tries, Importer_NS::CopyParams copy_params)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_row_descriptor()

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 255 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

255  {
256  return row_desc_;
257 };
TRowDescriptor row_desc_
+ Here is the caller graph for this function:

◆ print_row_with_delim()

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const Importer_NS::CopyParams copy_params 
)

Definition at line 99 of file RowToColumnLoader.cpp.

References Importer_NS::CopyParams::delimiter, and out.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

101  {
102  std::ostringstream out;
103  bool first = true;
104  for (TStringValue ts : row) {
105  if (first) {
106  first = false;
107  } else {
108  out << copy_params.delimiter;
109  }
110  out << ts.str_val;
111  }
112  return out.str();
113 }
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
+ Here is the caller graph for this function:

◆ wait_disconnet_reconnnect_retry()

void RowToColumnLoader::wait_disconnet_reconnnect_retry ( size_t  tries,
Importer_NS::CopyParams  copy_params 
)
private

Definition at line 362 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), Importer_NS::CopyParams::retry_count, and Importer_NS::CopyParams::retry_wait.

Referenced by do_load().

364  {
365  std::cout << " Waiting " << copy_params.retry_wait
366  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
367  << " times more " << std::endl;
368  sleep(copy_params.retry_wait);
369 
370  closeConnection();
372 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ array_column_type_info_

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 86 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

◆ client_

mapd::shared_ptr<MapDClient> RowToColumnLoader::client_
private

Definition at line 90 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

◆ column_type_info_

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 85 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

◆ conn_details_

ThriftClientConnection RowToColumnLoader::conn_details_
private

◆ db_name_

std::string RowToColumnLoader::db_name_
private

Definition at line 80 of file RowToColumnLoader.h.

Referenced by createConnection().

◆ input_columns_

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 84 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

◆ passwd_

std::string RowToColumnLoader::passwd_
private

Definition at line 79 of file RowToColumnLoader.h.

Referenced by createConnection().

◆ row_desc_

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 88 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

◆ session_

TSessionId RowToColumnLoader::session_
private

Definition at line 91 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

◆ table_name_

std::string RowToColumnLoader::table_name_
private

Definition at line 81 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

◆ user_name_

std::string RowToColumnLoader::user_name_
private

Definition at line 78 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: