OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowToColumnLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
28 #include "Logger/Logger.h"
29 
30 using namespace ::apache::thrift;
31 
32 SQLTypes get_sql_types(const TColumnType& ct) {
33  switch (ct.col_type.type) {
34  case TDatumType::BIGINT:
35  return SQLTypes::kBIGINT;
36  case TDatumType::BOOL:
37  return SQLTypes::kBOOLEAN;
38  case TDatumType::DATE:
39  return SQLTypes::kDATE;
40  case TDatumType::DECIMAL:
41  return SQLTypes::kDECIMAL;
42  case TDatumType::DOUBLE:
43  return SQLTypes::kDOUBLE;
44  case TDatumType::FLOAT:
45  return SQLTypes::kFLOAT;
46  case TDatumType::INT:
47  return SQLTypes::kINT;
48  case TDatumType::STR:
49  // Tdataum is lossy here so need to look at precision to see if it was defined
50  if (ct.col_type.precision == 0) {
51  return SQLTypes::kTEXT;
52  } else {
53  return SQLTypes::kVARCHAR;
54  }
55  case TDatumType::TIME:
56  return SQLTypes::kTIME;
57  case TDatumType::TIMESTAMP:
58  return SQLTypes::kTIMESTAMP;
59  case TDatumType::SMALLINT:
60  return SQLTypes::kSMALLINT;
61  case TDatumType::TINYINT:
62  return SQLTypes::kTINYINT;
63  case TDatumType::POINT:
64  return SQLTypes::kPOINT;
65  case TDatumType::LINESTRING:
66  return SQLTypes::kLINESTRING;
67  case TDatumType::POLYGON:
68  return SQLTypes::kPOLYGON;
69  case TDatumType::MULTIPOLYGON:
71  default:
72  LOG(FATAL) << "Unsupported TColumnType found, should not be possible";
73  return SQLTypes::kNULLT; // satisfy return-type warning
74  }
75 }
76 
78  if (ct.col_type.is_array) {
80  ct.col_type.precision,
81  ct.col_type.scale,
82  ct.col_type.nullable,
84  0,
85  get_sql_types(ct));
86  } else {
87  // normal column
88  // NOTE(se)
89  // for geo types, the values inserted for the other fields
90  // may not be valid, but only the type field is ever used
91  return SQLTypeInfo(get_sql_types(ct),
92  ct.col_type.precision,
93  ct.col_type.scale,
94  ct.col_type.nullable,
96  0,
98  }
99 }
100 
101 // this function allows us to treat array columns natively in the rest of the code
102 // by creating fact column description
104  return SQLTypeInfo(get_sql_types(ct),
105  ct.col_type.precision,
106  ct.col_type.scale,
107  ct.col_type.nullable,
109  0,
111 }
112 
114  std::vector<TStringValue> row,
115  const import_export::CopyParams& copy_params) {
116  std::ostringstream out;
117  bool first = true;
118  for (TStringValue ts : row) {
119  if (first) {
120  first = false;
121  } else {
122  out << copy_params.delimiter;
123  }
124  out << ts.str_val;
125  }
126  return out.str();
127 }
128 
129 // remove the entries from a row that has a failure during processing
130 // we must remove the entries that have been pushed onto the input_col so far
131 void remove_partial_row(size_t failed_column,
132  std::vector<SQLTypeInfo> column_type_info_vector,
133  std::vector<TColumn>& input_col_vec) {
134  for (size_t idx = 0; idx < failed_column; idx++) {
135  switch (column_type_info_vector[idx].get_type()) {
136  case SQLTypes::kARRAY:
137  input_col_vec[idx].nulls.pop_back();
138  input_col_vec[idx].data.arr_col.pop_back();
139  break;
140  case SQLTypes::kTEXT:
141  case SQLTypes::kCHAR:
142  case SQLTypes::kVARCHAR:
143  input_col_vec[idx].nulls.pop_back();
144  input_col_vec[idx].data.str_col.pop_back();
145  break;
146  case SQLTypes::kTINYINT:
147  case SQLTypes::kINT:
148  case SQLTypes::kBIGINT:
149  case SQLTypes::kSMALLINT:
150  case SQLTypes::kDATE:
151  case SQLTypes::kTIME:
153  case SQLTypes::kNUMERIC:
154  case SQLTypes::kDECIMAL:
155  case SQLTypes::kBOOLEAN:
156  input_col_vec[idx].nulls.pop_back();
157  input_col_vec[idx].data.int_col.pop_back();
158  break;
159  case SQLTypes::kFLOAT:
160  case SQLTypes::kDOUBLE:
161  input_col_vec[idx].nulls.pop_back();
162  input_col_vec[idx].data.real_col.pop_back();
163  break;
164  case SQLTypes::kPOINT:
166  case SQLTypes::kPOLYGON:
168  input_col_vec[idx].nulls.pop_back();
169  input_col_vec[idx].data.str_col.pop_back();
170  break;
171  default:
172  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
173  }
174  }
175 }
176 
177 void populate_TColumn(TStringValue ts,
178  SQLTypeInfo column_type_info,
179  TColumn& input_col,
180  const import_export::CopyParams& copy_params) {
181  // create datum and push data to column structure from row data
182  switch (column_type_info.get_type()) {
183  case SQLTypes::kARRAY:
184  LOG(FATAL) << "Trying to process ARRAY at item level something is wrong";
185  break;
186  case SQLTypes::kTEXT:
187  case SQLTypes::kCHAR:
188  case SQLTypes::kVARCHAR:
189  case SQLTypes::kPOINT:
191  case SQLTypes::kPOLYGON:
193  if (ts.is_null) {
194  input_col.nulls.push_back(true);
195  input_col.data.str_col.emplace_back("");
196 
197  } else {
198  input_col.nulls.push_back(false);
199  switch (column_type_info.get_type()) {
200  case SQLTypes::kCHAR:
201  case SQLTypes::kVARCHAR:
202  input_col.data.str_col.push_back(
203  ts.str_val.substr(0, column_type_info.get_precision()));
204  break;
205  case SQLTypes::kTEXT:
206  case SQLTypes::kPOINT:
208  case SQLTypes::kPOLYGON:
210  input_col.data.str_col.push_back(ts.str_val);
211  break;
212  default:
213  LOG(FATAL) << " trying to process a STRING transport type not handled "
214  << column_type_info.get_type();
215  }
216  }
217  break;
218  case SQLTypes::kINT:
219  case SQLTypes::kBIGINT:
220  case SQLTypes::kSMALLINT:
221  case SQLTypes::kTINYINT:
222  case SQLTypes::kDATE:
223  case SQLTypes::kTIME:
225  case SQLTypes::kNUMERIC:
226  case SQLTypes::kDECIMAL:
227  case SQLTypes::kBOOLEAN:
228  if (ts.is_null) {
229  input_col.nulls.push_back(true);
230  input_col.data.int_col.push_back(0);
231  } else {
232  input_col.nulls.push_back(false);
233  Datum d = StringToDatum(ts.str_val, column_type_info);
234  switch (column_type_info.get_type()) {
235  case SQLTypes::kINT:
236  case SQLTypes::kBOOLEAN:
237  input_col.data.int_col.push_back(d.intval);
238  break;
239  case SQLTypes::kBIGINT:
240  case SQLTypes::kNUMERIC:
241  case SQLTypes::kDECIMAL:
242  input_col.data.int_col.push_back(d.bigintval);
243  break;
244  case SQLTypes::kSMALLINT:
245  input_col.data.int_col.push_back(d.smallintval);
246  break;
247  case SQLTypes::kTINYINT:
248  input_col.data.int_col.push_back(d.tinyintval);
249  break;
250  case SQLTypes::kDATE:
251  case SQLTypes::kTIME:
253  input_col.data.int_col.push_back(d.bigintval);
254  break;
255  default:
256  LOG(FATAL) << " trying to process an INT transport type not handled "
257  << column_type_info.get_type();
258  }
259  }
260  break;
261  case SQLTypes::kFLOAT:
262  case SQLTypes::kDOUBLE:
263  if (ts.is_null) {
264  input_col.nulls.push_back(true);
265  input_col.data.real_col.push_back(0);
266 
267  } else {
268  input_col.nulls.push_back(false);
269  Datum d = StringToDatum(ts.str_val, column_type_info);
270  switch (column_type_info.get_type()) {
271  case SQLTypes::kFLOAT:
272  input_col.data.real_col.push_back(d.floatval);
273  break;
274  case SQLTypes::kDOUBLE:
275  input_col.data.real_col.push_back(d.doubleval);
276  break;
277  default:
278  LOG(FATAL) << " trying to process a REAL transport type not handled "
279  << column_type_info.get_type();
280  }
281  }
282  break;
283  default:
284  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
285  }
286 }
287 
289  return row_desc_;
290 };
291 
293  std::vector<TStringValue> row,
294  const import_export::CopyParams& copy_params) {
295  // create datum and push data to column structure from row data
296  uint curr_col = 0;
297  for (TStringValue ts : row) {
298  try {
299  switch (column_type_info_[curr_col].get_type()) {
300  case SQLTypes::kARRAY: {
301  std::vector<std::string> arr_ele;
303  ts.str_val, copy_params, arr_ele);
304  TColumn array_tcol;
305  for (std::string item : arr_ele) {
306  boost::algorithm::trim(item);
307  TStringValue tsa;
308  tsa.str_val = item;
309  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
310  // now put into TColumn
312  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
313  }
314  input_columns_[curr_col].nulls.push_back(false);
315  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
316 
317  break;
318  }
319  default:
321  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
322  }
323  } catch (const std::exception& e) {
325  // import_status.rows_rejected++;
326  LOG(ERROR) << "Input exception thrown: " << e.what()
327  << ". Row discarded, issue at column : " << (curr_col + 1)
328  << " data :" << print_row_with_delim(row, copy_params);
329  return false;
330  }
331  curr_col++;
332  }
333  return true;
334 }
335 
337  const std::string& user_name,
338  const std::string& passwd,
339  const std::string& db_name,
340  const std::string& table_name)
341  : user_name_(user_name)
342  , passwd_(passwd)
343  , db_name_(db_name)
344  , table_name_(table_name)
345  , conn_details_(conn_details) {
347 
348  TTableDetails table_details;
349  client_->get_table_details(table_details, session_, table_name_);
350 
351  row_desc_ = table_details.row_desc;
352 
353  // create vector with column details
354  for (TColumnType ct : row_desc_) {
356  }
357 
358  // create vector with array column details presented as real column for easier resue
359  // of othe code
360  for (TColumnType ct : row_desc_) {
362  }
363 
364  // create vector for storage of the actual column data
365  for (TColumnType column : row_desc_) {
366  TColumn t;
367  input_columns_.push_back(t);
368  }
369 }
371  closeConnection();
372 }
373 
375  client_.reset(new OmniSciClient(conn_details_.get_protocol()));
376 
377  try {
379  } catch (TOmniSciException& e) {
380  std::cerr << e.error_msg << std::endl;
381  } catch (TException& te) {
382  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
383  }
384 }
385 
387  try {
388  client_->disconnect(session_); // disconnect from omnisci_server
389  } catch (TOmniSciException& e) {
390  std::cerr << e.error_msg << std::endl;
391  } catch (TException& te) {
392  std::cerr << "Thrift error on close: " << te.what() << std::endl;
393  }
394 }
395 
397  size_t tries,
398  import_export::CopyParams copy_params) {
399  std::cout << " Waiting " << copy_params.retry_wait
400  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
401  << " times more " << std::endl;
402  sleep(copy_params.retry_wait);
403 
404  closeConnection();
406 }
407 
409  int& nskipped,
410  import_export::CopyParams copy_params) {
411  for (size_t tries = 0; tries < copy_params.retry_count;
412  tries++) { // allow for retries in case of insert failure
413  try {
414  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
415  // client->load_table(session, table_name, input_rows);
416  nrows += input_columns_[0].nulls.size();
417  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
418  << std::endl;
419  // we successfully loaded the data, lets move on
420  input_columns_.clear();
421  // create vector for storage of the actual column data
422  for (TColumnType column : row_desc_) {
423  TColumn t;
424  input_columns_.push_back(t);
425  }
426  return;
427  } catch (TOmniSciException& e) {
428  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
429  wait_disconnet_reconnnect_retry(tries, copy_params);
430  } catch (TException& te) {
431  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
432  wait_disconnet_reconnnect_retry(tries, copy_params);
433  }
434  }
435  std::cerr << "Retries exhausted program terminated" << std::endl;
436  exit(1);
437 }
int8_t tinyintval
Definition: sqltypes.h:206
Definition: sqltypes.h:51
TRowDescriptor get_row_descriptor()
SQLTypes
Definition: sqltypes.h:40
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:188
void wait_disconnet_reconnnect_retry(size_t tries, import_export::CopyParams copy_params)
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:330
TRowDescriptor row_desc_
int32_t intval
Definition: sqltypes.h:208
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
float floatval
Definition: sqltypes.h:210
mapd::shared_ptr< TProtocol > get_protocol()
int64_t bigintval
Definition: sqltypes.h:209
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
int16_t smallintval
Definition: sqltypes.h:207
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:124
int get_precision() const
Definition: sqltypes.h:333
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:54
Definition: sqltypes.h:55
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:43
void createConnection(const ThriftClientConnection &con)
Definition: sqltypes.h:47
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.
double doubleval
Definition: sqltypes.h:211
mapd::shared_ptr< OmniSciClient > client_