OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowToColumnLoader Class Reference

#include <RowToColumnLoader.h>

+ Collaboration diagram for RowToColumnLoader:

Public Member Functions

 RowToColumnLoader (const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
 
 ~RowToColumnLoader ()
 
void do_load (int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
 
bool convert_string_to_column (std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
 
TRowDescriptor get_row_descriptor ()
 
std::string print_row_with_delim (std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
 

Private Member Functions

void createConnection (const ThriftClientConnection &con)
 
void closeConnection ()
 
void wait_disconnet_reconnnect_retry (size_t tries, Importer_NS::CopyParams copy_params)
 

Private Attributes

std::string user_name_
 
std::string passwd_
 
std::string db_name_
 
std::string table_name_
 
ThriftClientConnection conn_details_
 
std::vector< TColumn > input_columns_
 
std::vector< SQLTypeInfocolumn_type_info_
 
std::vector< SQLTypeInfoarray_column_type_info_
 
TRowDescriptor row_desc_
 
mapd::shared_ptr< MapDClient > client_
 
TSessionId session_
 

Detailed Description

Definition at line 62 of file RowToColumnLoader.h.

Constructor & Destructor Documentation

RowToColumnLoader::RowToColumnLoader ( const ThriftClientConnection conn_details,
const std::string &  user_name,
const std::string &  passwd,
const std::string &  db_name,
const std::string &  table_name 
)

Definition at line 329 of file RowToColumnLoader.cpp.

References array_column_type_info_, client_, column_type_info_, conn_details_, create_array_sql_type_info_from_col_type(), create_sql_type_info_from_col_type(), createConnection(), input_columns_, row_desc_, session_, and table_name_.

334  : user_name_(user_name)
335  , passwd_(passwd)
336  , db_name_(db_name)
337  , table_name_(table_name)
338  , conn_details_(conn_details) {
340 
341  TTableDetails table_details;
342  client_->get_table_details(table_details, session_, table_name_);
343 
344  row_desc_ = table_details.row_desc;
345 
346  // create vector with column details
347  for (TColumnType ct : row_desc_) {
349  }
350 
351  // create vector with array column details presented as real column for easier resue
352  // of othe code
353  for (TColumnType ct : row_desc_) {
355  }
356 
357  // create vector for storage of the actual column data
358  for (TColumnType column : row_desc_) {
359  TColumn t;
360  input_columns_.push_back(t);
361  }
362 }
mapd::shared_ptr< MapDClient > client_
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

RowToColumnLoader::~RowToColumnLoader ( )

Definition at line 363 of file RowToColumnLoader.cpp.

References closeConnection().

363  {
364  closeConnection();
365 }

+ Here is the call graph for this function:

Member Function Documentation

void RowToColumnLoader::closeConnection ( )
private

Definition at line 379 of file RowToColumnLoader.cpp.

References client_, and session_.

Referenced by wait_disconnet_reconnnect_retry(), and ~RowToColumnLoader().

379  {
380  try {
381  client_->disconnect(session_); // disconnect from omnisci_server
382  } catch (TMapDException& e) {
383  std::cerr << e.error_msg << std::endl;
384  } catch (TException& te) {
385  std::cerr << "Thrift error on close: " << te.what() << std::endl;
386  }
387 }
mapd::shared_ptr< MapDClient > client_

+ Here is the caller graph for this function:

bool RowToColumnLoader::convert_string_to_column ( std::vector< TStringValue >  row,
const Importer_NS::CopyParams copy_params 
)

Definition at line 285 of file RowToColumnLoader.cpp.

References array_column_type_info_, column_type_info_, logger::ERROR, input_columns_, kARRAY, LOG, Importer_NS::CopyParams::null_str, Importer_NS::DelimitedParserUtils::parseStringArray(), populate_TColumn(), print_row_with_delim(), and remove_partial_row().

Referenced by msg_consume(), and stream_insert().

287  {
288  // create datum and push data to column structure from row data
289  uint curr_col = 0;
290  for (TStringValue ts : row) {
291  try {
292  switch (column_type_info_[curr_col].get_type()) {
293  case SQLTypes::kARRAY: {
294  std::vector<std::string> arr_ele;
296  ts.str_val, copy_params, arr_ele);
297  TColumn array_tcol;
298  for (std::string item : arr_ele) {
299  boost::algorithm::trim(item);
300  TStringValue tsa;
301  tsa.str_val = item;
302  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
303  // now put into TColumn
305  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
306  }
307  input_columns_[curr_col].nulls.push_back(false);
308  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
309 
310  break;
311  }
312  default:
314  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
315  }
316  } catch (const std::exception& e) {
318  // import_status.rows_rejected++;
319  LOG(ERROR) << "Input exception thrown: " << e.what()
320  << ". Row discarded, issue at column : " << (curr_col + 1)
321  << " data :" << print_row_with_delim(row, copy_params);
322  return false;
323  }
324  curr_col++;
325  }
326  return true;
327 }
std::string null_str
Definition: CopyParams.h:47
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:185
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::vector< SQLTypeInfo > column_type_info_
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
static void parseStringArray(const std::string &s, const Importer_NS::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::createConnection ( const ThriftClientConnection con)
private

Definition at line 367 of file RowToColumnLoader.cpp.

References client_, conn_details_, db_name_, ThriftClientConnection::get_protocol(), passwd_, session_, and user_name_.

Referenced by RowToColumnLoader(), and wait_disconnet_reconnnect_retry().

367  {
368  client_.reset(new MapDClient(conn_details_.get_protocol()));
369 
370  try {
372  } catch (TMapDException& e) {
373  std::cerr << e.error_msg << std::endl;
374  } catch (TException& te) {
375  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
376  }
377 }
mapd::shared_ptr< MapDClient > client_
std::string user_name_
mapd::shared_ptr< TProtocol > get_protocol()
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void RowToColumnLoader::do_load ( int &  nrows,
int &  nskipped,
Importer_NS::CopyParams  copy_params 
)

Definition at line 401 of file RowToColumnLoader.cpp.

References client_, input_columns_, Importer_NS::CopyParams::retry_count, row_desc_, session_, table_name_, and wait_disconnet_reconnnect_retry().

Referenced by kafka_insert(), and stream_insert().

403  {
404  for (size_t tries = 0; tries < copy_params.retry_count;
405  tries++) { // allow for retries in case of insert failure
406  try {
407  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
408  // client->load_table(session, table_name, input_rows);
409  nrows += input_columns_[0].nulls.size();
410  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
411  << std::endl;
412  // we successfully loaded the data, lets move on
413  input_columns_.clear();
414  // create vector for storage of the actual column data
415  for (TColumnType column : row_desc_) {
416  TColumn t;
417  input_columns_.push_back(t);
418  }
419  return;
420  } catch (TMapDException& e) {
421  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
422  wait_disconnet_reconnnect_retry(tries, copy_params);
423  } catch (TException& te) {
424  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
425  wait_disconnet_reconnnect_retry(tries, copy_params);
426  }
427  }
428  std::cerr << "Retries exhausted program terminated" << std::endl;
429  exit(1);
430 }
mapd::shared_ptr< MapDClient > client_
std::string table_name_
TRowDescriptor row_desc_
std::vector< TColumn > input_columns_
void wait_disconnet_reconnnect_retry(size_t tries, Importer_NS::CopyParams copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TRowDescriptor RowToColumnLoader::get_row_descriptor ( )

Definition at line 281 of file RowToColumnLoader.cpp.

References row_desc_.

Referenced by msg_consume(), and stream_insert().

281  {
282  return row_desc_;
283 };
TRowDescriptor row_desc_

+ Here is the caller graph for this function:

std::string RowToColumnLoader::print_row_with_delim ( std::vector< TStringValue >  row,
const Importer_NS::CopyParams copy_params 
)

Definition at line 111 of file RowToColumnLoader.cpp.

References Importer_NS::CopyParams::delimiter, and out.

Referenced by convert_string_to_column(), msg_consume(), and stream_insert().

113  {
114  std::ostringstream out;
115  bool first = true;
116  for (TStringValue ts : row) {
117  if (first) {
118  first = false;
119  } else {
120  out << copy_params.delimiter;
121  }
122  out << ts.str_val;
123  }
124  return out.str();
125 }
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out

+ Here is the caller graph for this function:

void RowToColumnLoader::wait_disconnet_reconnnect_retry ( size_t  tries,
Importer_NS::CopyParams  copy_params 
)
private

Definition at line 389 of file RowToColumnLoader.cpp.

References closeConnection(), conn_details_, createConnection(), Importer_NS::CopyParams::retry_count, and Importer_NS::CopyParams::retry_wait.

Referenced by do_load().

391  {
392  std::cout << " Waiting " << copy_params.retry_wait
393  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
394  << " times more " << std::endl;
395  sleep(copy_params.retry_wait);
396 
397  closeConnection();
399 }
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<SQLTypeInfo> RowToColumnLoader::array_column_type_info_
private

Definition at line 86 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

mapd::shared_ptr<MapDClient> RowToColumnLoader::client_
private

Definition at line 90 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::vector<SQLTypeInfo> RowToColumnLoader::column_type_info_
private

Definition at line 85 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), and RowToColumnLoader().

ThriftClientConnection RowToColumnLoader::conn_details_
private
std::string RowToColumnLoader::db_name_
private

Definition at line 80 of file RowToColumnLoader.h.

Referenced by createConnection().

std::vector<TColumn> RowToColumnLoader::input_columns_
private

Definition at line 84 of file RowToColumnLoader.h.

Referenced by convert_string_to_column(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::passwd_
private

Definition at line 79 of file RowToColumnLoader.h.

Referenced by createConnection().

TRowDescriptor RowToColumnLoader::row_desc_
private

Definition at line 88 of file RowToColumnLoader.h.

Referenced by do_load(), get_row_descriptor(), and RowToColumnLoader().

TSessionId RowToColumnLoader::session_
private

Definition at line 91 of file RowToColumnLoader.h.

Referenced by closeConnection(), createConnection(), do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::table_name_
private

Definition at line 81 of file RowToColumnLoader.h.

Referenced by do_load(), and RowToColumnLoader().

std::string RowToColumnLoader::user_name_
private

Definition at line 78 of file RowToColumnLoader.h.

Referenced by createConnection().


The documentation for this class was generated from the following files: