OmniSciDB  343343d194
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
anonymous_namespace{StreamInsert.cpp} Namespace Reference

Functions

void createConnection (ConnectionDetails con)
 
void closeConnection ()
 
void wait_disconnet_reconnnect_retry (size_t tries, CopyParams copy_params, ConnectionDetails conn_details)
 
void do_load (int &nrows, int &nskipped, std::vector< TStringRow > input_rows, const std::string &table_name, CopyParams copy_params, ConnectionDetails conn_details)
 
void stream_insert (const std::string &table_name, const TRowDescriptor &row_desc, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const CopyParams &copy_params, const ConnectionDetails conn_details, const bool remove_quotes)
 

Function Documentation

void anonymous_namespace{StreamInsert.cpp}::closeConnection ( )

Definition at line 123 of file StreamInsert.cpp.

References client, mytransport, and session.

Referenced by main(), and wait_disconnet_reconnnect_retry().

123  {
124  try {
125  client->disconnect(session); // disconnect from omnisci_server
126  mytransport->close(); // close transport
127  } catch (TMapDException& e) {
128  std::cerr << e.error_msg << std::endl;
129  } catch (TException& te) {
130  std::cerr << "Thrift error: " << te.what() << std::endl;
131  }
132 }
mapd::shared_ptr< MapDClient > client
mapd::shared_ptr< apache::thrift::transport::TTransport > mytransport
TSessionId session

+ Here is the caller graph for this function:

void anonymous_namespace{StreamInsert.cpp}::createConnection ( ConnectionDetails  con)

Definition at line 107 of file StreamInsert.cpp.

References client, ConnectionDetails::db_name, mytransport, ConnectionDetails::passwd, ConnectionDetails::port, ConnectionDetails::server_host, session, and ConnectionDetails::user_name.

Referenced by main(), and wait_disconnet_reconnnect_retry().

107  {
108  mapd::shared_ptr<TTransport> socket(new TSocket(con.server_host, con.port));
109  mytransport.reset(new TBufferedTransport(socket));
110  mapd::shared_ptr<TProtocol> protocol(new TBinaryProtocol(mytransport));
111  client.reset(new MapDClient(protocol));
112  try {
113  mytransport->open(); // open transport
114  client->connect(
115  session, con.user_name, con.passwd, con.db_name); // connect to omnisci_server
116  } catch (TMapDException& e) {
117  std::cerr << e.error_msg << std::endl;
118  } catch (TException& te) {
119  std::cerr << "Thrift error: " << te.what() << std::endl;
120  }
121 }
std::string user_name
std::string db_name
std::string server_host
mapd::shared_ptr< MapDClient > client
std::string passwd
mapd::shared_ptr< apache::thrift::transport::TTransport > mytransport
TSessionId session

+ Here is the caller graph for this function:

void anonymous_namespace{StreamInsert.cpp}::do_load ( int &  nrows,
int &  nskipped,
std::vector< TStringRow >  input_rows,
const std::string &  table_name,
CopyParams  copy_params,
ConnectionDetails  conn_details 
)

Definition at line 146 of file StreamInsert.cpp.

References client, CopyParams::retry_count, session, and wait_disconnet_reconnnect_retry().

Referenced by stream_insert().

151  {
152  for (size_t tries = 0; tries < copy_params.retry_count;
153  tries++) { // allow for retries in case of insert failure
154  try {
155  client->load_table(session, table_name, input_rows);
156  nrows += input_rows.size();
157  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
158  << std::endl;
159  // we successfully loaded the data, lets move on
160  return;
161  } catch (TMapDException& e) {
162  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
163  wait_disconnet_reconnnect_retry(tries, copy_params, conn_details);
164  } catch (TException& te) {
165  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
166  wait_disconnet_reconnnect_retry(tries, copy_params, conn_details);
167  }
168  }
169  std::cerr << "Retries exhausted program terminated" << std::endl;
170  exit(1);
171 }
size_t retry_count
mapd::shared_ptr< MapDClient > client
void wait_disconnet_reconnnect_retry(size_t tries, CopyParams copy_params, ConnectionDetails conn_details)
TSessionId session

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{StreamInsert.cpp}::stream_insert ( const std::string &  table_name,
const TRowDescriptor &  row_desc,
const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &  transformations,
const CopyParams copy_params,
const ConnectionDetails  conn_details,
const bool  remove_quotes 
)

Definition at line 175 of file StreamInsert.cpp.

References CopyParams::batch_size, CopyParams::delimiter, do_load(), field(), CopyParams::line_delim, MAX_FIELD_LEN, CopyParams::null_str, print_error_data, and print_transformation.

183  {
184  std::vector<TStringRow> input_rows;
185  TStringRow row;
186 
187  std::ios_base::sync_with_stdio(false);
188  std::istream_iterator<char> eos;
189  std::cin >> std::noskipws;
190  std::istream_iterator<char> iit(std::cin);
191 
192  char field[MAX_FIELD_LEN];
193  size_t field_i = 0;
194 
195  int nrows = 0;
196  int nskipped = 0;
197  bool backEscape = false;
198 
199  const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*
200  xforms[row_desc.size()];
201  for (size_t i = 0; i < row_desc.size(); i++) {
202  auto it = transformations.find(row_desc[i].col_name);
203  if (it != transformations.end()) {
204  xforms[i] = &(it->second);
205  } else {
206  xforms[i] = nullptr;
207  }
208  }
209 
210  while (iit != eos) {
211  row.cols.clear();
212  // construct a row
213  while (iit != eos) {
214  if (*iit == copy_params.delimiter || *iit == copy_params.line_delim) {
215  bool end_of_field = (*iit == copy_params.delimiter);
216  bool end_of_row;
217  if (end_of_field) {
218  end_of_row = false;
219  } else {
220  end_of_row = (row_desc[row.cols.size()].col_type.type != TDatumType::STR) ||
221  (row.cols.size() == row_desc.size() - 1);
222  if (!end_of_row) {
223  size_t l = copy_params.null_str.size();
224  if (field_i >= l &&
225  strncmp(field + field_i - l, copy_params.null_str.c_str(), l) == 0) {
226  end_of_row = true;
227  // std::cout << "new line after null.\n";
228  }
229  }
230  }
231  if (!end_of_field && !end_of_row) {
232  // not enough columns yet and it is a string column
233  // treat the line delimiter as part of the string
234  field[field_i++] = *iit;
235  } else {
236  field[field_i] = '\0';
237  field_i = 0;
238  TStringValue ts;
239  ts.str_val = std::string(field);
240  ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.null_str);
241  auto xform =
242  row.cols.size() < row_desc.size() ? xforms[row.cols.size()] : nullptr;
243  if (!ts.is_null && xform != nullptr) {
244  if (print_transformation) {
245  std::cout << "\ntransforming\n" << ts.str_val << "\nto\n";
246  }
247  ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
248  if (ts.str_val.empty()) {
249  ts.is_null = true;
250  }
251  if (print_transformation) {
252  std::cout << ts.str_val << std::endl;
253  }
254  }
255  row.cols.push_back(ts); // add column value to row
256  if (end_of_row || (row.cols.size() > row_desc.size())) {
257  break; // found row
258  }
259  }
260  } else {
261  if (*iit == '\\') {
262  backEscape = true;
263  } else if (backEscape || !remove_quotes || *iit != '\"') {
264  field[field_i++] = *iit;
265  backEscape = false;
266  }
267  // else if unescaped double-quote, continue without adding the
268  // charactger to the field string.
269  }
270  if (field_i >= MAX_FIELD_LEN) {
271  field[MAX_FIELD_LEN - 1] = '\0';
272  std::cerr << "String too long for buffer." << std::endl;
273  if (print_error_data) {
274  std::cerr << field << std::endl;
275  }
276  field_i = 0;
277  break;
278  }
279  ++iit;
280  }
281  if (row.cols.size() == row_desc.size()) {
282  input_rows.push_back(row);
283  if (input_rows.size() >= copy_params.batch_size) {
284  do_load(nrows, nskipped, input_rows, table_name, copy_params, conn_details);
285  input_rows.clear();
286  }
287  } else {
288  ++nskipped;
289  if (print_error_data) {
290  std::cerr << "Incorrect number of columns for row at: ";
291  bool not_first = false;
292  for (const auto& p : row.cols) {
293  if (not_first) {
294  std::cerr << copy_params.delimiter;
295  } else {
296  not_first = true;
297  }
298  std::cerr << &p;
299  }
300  std::cerr << std::endl;
301  }
302  if (row.cols.size() > row_desc.size()) {
303  // skip to the next line delimiter
304  while (*iit != copy_params.line_delim) {
305  ++iit;
306  }
307  }
308  }
309  ++iit;
310  }
311  // load remaining rowset if any
312  if (input_rows.size() > 0) {
313  do_load(nrows, nskipped, input_rows, table_name, copy_params, conn_details);
314  }
315 }
void do_load(int &nrows, int &nskipped, std::vector< TStringRow > input_rows, const std::string &table_name, CopyParams copy_params, ConnectionDetails conn_details)
size_t batch_size
std::string null_str
#define MAX_FIELD_LEN
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool print_error_data

+ Here is the call graph for this function:

void anonymous_namespace{StreamInsert.cpp}::wait_disconnet_reconnnect_retry ( size_t  tries,
CopyParams  copy_params,
ConnectionDetails  conn_details 
)

Definition at line 134 of file StreamInsert.cpp.

References closeConnection(), createConnection(), CopyParams::retry_count, and CopyParams::retry_wait.

Referenced by do_load().

136  {
137  std::cout << " Waiting " << copy_params.retry_wait
138  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
139  << " times more " << std::endl;
140  sleep(copy_params.retry_wait);
141 
142  closeConnection();
143  createConnection(conn_details);
144 }
size_t retry_count
size_t retry_wait
void createConnection(ConnectionDetails con)

+ Here is the call graph for this function:

+ Here is the caller graph for this function: