OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowToColumnLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
28 #include "Logger/Logger.h"
29 
30 #include <chrono>
31 #include <thread>
32 
33 using namespace ::apache::thrift;
34 
35 SQLTypes get_sql_types(const TColumnType& ct) {
36  switch (ct.col_type.type) {
37  case TDatumType::BIGINT:
38  return SQLTypes::kBIGINT;
39  case TDatumType::BOOL:
40  return SQLTypes::kBOOLEAN;
41  case TDatumType::DATE:
42  return SQLTypes::kDATE;
44  return SQLTypes::kDECIMAL;
45  case TDatumType::DOUBLE:
46  return SQLTypes::kDOUBLE;
47  case TDatumType::FLOAT:
48  return SQLTypes::kFLOAT;
49  case TDatumType::INT:
50  return SQLTypes::kINT;
51  case TDatumType::STR:
52  // Tdataum is lossy here so need to look at precision to see if it was defined
53  if (ct.col_type.precision == 0) {
54  return SQLTypes::kTEXT;
55  } else {
56  return SQLTypes::kVARCHAR;
57  }
58  case TDatumType::TIME:
59  return SQLTypes::kTIME;
61  return SQLTypes::kTIMESTAMP;
63  return SQLTypes::kSMALLINT;
65  return SQLTypes::kTINYINT;
66  case TDatumType::POINT:
67  return SQLTypes::kPOINT;
69  return SQLTypes::kLINESTRING;
71  return SQLTypes::kPOLYGON;
74  default:
75  LOG(FATAL) << "Unsupported TColumnType found, should not be possible";
76  return SQLTypes::kNULLT; // satisfy return-type warning
77  }
78 }
79 
81  if (ct.col_type.is_array) {
83  ct.col_type.precision,
84  ct.col_type.scale,
85  ct.col_type.nullable,
87  0,
88  get_sql_types(ct));
89  } else {
90  // normal column
91  // NOTE(se)
92  // for geo types, the values inserted for the other fields
93  // may not be valid, but only the type field is ever used
94  return SQLTypeInfo(get_sql_types(ct),
95  ct.col_type.precision,
96  ct.col_type.scale,
97  ct.col_type.nullable,
99  0,
101  }
102 }
103 
104 // this function allows us to treat array columns natively in the rest of the code
105 // by creating fact column description
107  return SQLTypeInfo(get_sql_types(ct),
108  ct.col_type.precision,
109  ct.col_type.scale,
110  ct.col_type.nullable,
112  0,
114 }
115 
117  std::vector<TStringValue> row,
118  const import_export::CopyParams& copy_params) {
119  std::ostringstream out;
120  bool first = true;
121  for (TStringValue ts : row) {
122  if (first) {
123  first = false;
124  } else {
125  out << copy_params.delimiter;
126  }
127  out << ts.str_val;
128  }
129  return out.str();
130 }
131 
132 // remove the entries from a row that has a failure during processing
133 // we must remove the entries that have been pushed onto the input_col so far
134 void remove_partial_row(size_t failed_column,
135  std::vector<SQLTypeInfo> column_type_info_vector,
136  std::vector<TColumn>& input_col_vec) {
137  for (size_t idx = 0; idx < failed_column; idx++) {
138  switch (column_type_info_vector[idx].get_type()) {
139  case SQLTypes::kARRAY:
140  input_col_vec[idx].nulls.pop_back();
141  input_col_vec[idx].data.arr_col.pop_back();
142  break;
143  case SQLTypes::kTEXT:
144  case SQLTypes::kCHAR:
145  case SQLTypes::kVARCHAR:
146  input_col_vec[idx].nulls.pop_back();
147  input_col_vec[idx].data.str_col.pop_back();
148  break;
149  case SQLTypes::kTINYINT:
150  case SQLTypes::kINT:
151  case SQLTypes::kBIGINT:
152  case SQLTypes::kSMALLINT:
153  case SQLTypes::kDATE:
154  case SQLTypes::kTIME:
156  case SQLTypes::kNUMERIC:
157  case SQLTypes::kDECIMAL:
158  case SQLTypes::kBOOLEAN:
159  input_col_vec[idx].nulls.pop_back();
160  input_col_vec[idx].data.int_col.pop_back();
161  break;
162  case SQLTypes::kFLOAT:
163  case SQLTypes::kDOUBLE:
164  input_col_vec[idx].nulls.pop_back();
165  input_col_vec[idx].data.real_col.pop_back();
166  break;
167  case SQLTypes::kPOINT:
169  case SQLTypes::kPOLYGON:
171  input_col_vec[idx].nulls.pop_back();
172  input_col_vec[idx].data.str_col.pop_back();
173  break;
174  default:
175  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
176  }
177  }
178 }
179 
180 void populate_TColumn(TStringValue ts,
181  SQLTypeInfo column_type_info,
182  TColumn& input_col,
183  const import_export::CopyParams& copy_params) {
184  // create datum and push data to column structure from row data
185  switch (column_type_info.get_type()) {
186  case SQLTypes::kARRAY:
187  LOG(FATAL) << "Trying to process ARRAY at item level something is wrong";
188  break;
189  case SQLTypes::kTEXT:
190  case SQLTypes::kCHAR:
191  case SQLTypes::kVARCHAR:
192  case SQLTypes::kPOINT:
194  case SQLTypes::kPOLYGON:
196  if (ts.is_null) {
197  input_col.nulls.push_back(true);
198  input_col.data.str_col.emplace_back("");
199 
200  } else {
201  input_col.nulls.push_back(false);
202  switch (column_type_info.get_type()) {
203  case SQLTypes::kCHAR:
204  case SQLTypes::kVARCHAR:
205  input_col.data.str_col.push_back(
206  ts.str_val.substr(0, column_type_info.get_precision()));
207  break;
208  case SQLTypes::kTEXT:
209  case SQLTypes::kPOINT:
211  case SQLTypes::kPOLYGON:
213  input_col.data.str_col.push_back(ts.str_val);
214  break;
215  default:
216  LOG(FATAL) << " trying to process a STRING transport type not handled "
217  << column_type_info.get_type();
218  }
219  }
220  break;
221  case SQLTypes::kINT:
222  case SQLTypes::kBIGINT:
223  case SQLTypes::kSMALLINT:
224  case SQLTypes::kTINYINT:
225  case SQLTypes::kDATE:
226  case SQLTypes::kTIME:
228  case SQLTypes::kNUMERIC:
229  case SQLTypes::kDECIMAL:
230  case SQLTypes::kBOOLEAN:
231  if (ts.is_null) {
232  input_col.nulls.push_back(true);
233  input_col.data.int_col.push_back(0);
234  } else {
235  input_col.nulls.push_back(false);
236  Datum d = StringToDatum(ts.str_val, column_type_info);
237  switch (column_type_info.get_type()) {
238  case SQLTypes::kINT:
239  case SQLTypes::kBOOLEAN:
240  input_col.data.int_col.push_back(d.intval);
241  break;
242  case SQLTypes::kBIGINT:
243  case SQLTypes::kNUMERIC:
244  case SQLTypes::kDECIMAL:
245  input_col.data.int_col.push_back(d.bigintval);
246  break;
247  case SQLTypes::kSMALLINT:
248  input_col.data.int_col.push_back(d.smallintval);
249  break;
250  case SQLTypes::kTINYINT:
251  input_col.data.int_col.push_back(d.tinyintval);
252  break;
253  case SQLTypes::kDATE:
254  case SQLTypes::kTIME:
256  input_col.data.int_col.push_back(d.bigintval);
257  break;
258  default:
259  LOG(FATAL) << " trying to process an INT transport type not handled "
260  << column_type_info.get_type();
261  }
262  }
263  break;
264  case SQLTypes::kFLOAT:
265  case SQLTypes::kDOUBLE:
266  if (ts.is_null) {
267  input_col.nulls.push_back(true);
268  input_col.data.real_col.push_back(0);
269 
270  } else {
271  input_col.nulls.push_back(false);
272  Datum d = StringToDatum(ts.str_val, column_type_info);
273  switch (column_type_info.get_type()) {
274  case SQLTypes::kFLOAT:
275  input_col.data.real_col.push_back(d.floatval);
276  break;
277  case SQLTypes::kDOUBLE:
278  input_col.data.real_col.push_back(d.doubleval);
279  break;
280  default:
281  LOG(FATAL) << " trying to process a REAL transport type not handled "
282  << column_type_info.get_type();
283  }
284  }
285  break;
286  default:
287  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
288  }
289 }
290 
292  return row_desc_;
293 };
294 
296  std::vector<TStringValue> row,
297  const import_export::CopyParams& copy_params) {
298  // create datum and push data to column structure from row data
299  uint64_t curr_col = 0;
300  for (TStringValue ts : row) {
301  try {
302  switch (column_type_info_[curr_col].get_type()) {
303  case SQLTypes::kARRAY: {
304  std::vector<std::string> arr_ele;
306  ts.str_val, copy_params, arr_ele);
307  TColumn array_tcol;
308  for (std::string item : arr_ele) {
309  boost::algorithm::trim(item);
310  TStringValue tsa;
311  tsa.str_val = item;
312  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
313  // now put into TColumn
315  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
316  }
317  input_columns_[curr_col].nulls.push_back(false);
318  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
319 
320  break;
321  }
322  default:
324  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
325  }
326  } catch (const std::exception& e) {
328  // import_status.rows_rejected++;
329  LOG(ERROR) << "Input exception thrown: " << e.what()
330  << ". Row discarded, issue at column : " << (curr_col + 1)
331  << " data :" << print_row_with_delim(row, copy_params);
332  return false;
333  }
334  curr_col++;
335  }
336  return true;
337 }
338 
340  const std::string& user_name,
341  const std::string& passwd,
342  const std::string& db_name,
343  const std::string& table_name)
344  : user_name_(user_name)
345  , passwd_(passwd)
346  , db_name_(db_name)
347  , table_name_(table_name)
348  , conn_details_(conn_details) {
350 
351  TTableDetails table_details;
352  client_->get_table_details(table_details, session_, table_name_);
353 
354  row_desc_ = table_details.row_desc;
355 
356  // create vector with column details
357  for (TColumnType ct : row_desc_) {
359  }
360 
361  // create vector with array column details presented as real column for easier resue
362  // of othe code
363  for (TColumnType ct : row_desc_) {
365  }
366 
367  // create vector for storage of the actual column data
368  for (TColumnType column : row_desc_) {
369  TColumn t;
370  input_columns_.push_back(t);
371  }
372 }
374  closeConnection();
375 }
376 
378  client_.reset(new OmniSciClient(conn_details_.get_protocol()));
379 
380  try {
382  } catch (TOmniSciException& e) {
383  std::cerr << e.error_msg << std::endl;
384  } catch (TException& te) {
385  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
386  }
387 }
388 
390  try {
391  client_->disconnect(session_); // disconnect from omnisci_server
392  } catch (TOmniSciException& e) {
393  std::cerr << e.error_msg << std::endl;
394  } catch (TException& te) {
395  std::cerr << "Thrift error on close: " << te.what() << std::endl;
396  }
397 }
398 
400  size_t tries,
401  import_export::CopyParams copy_params) {
402  std::cout << " Waiting " << copy_params.retry_wait
403  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
404  << " times more " << std::endl;
405  std::this_thread::sleep_for(std::chrono::seconds(copy_params.retry_wait));
406 
407  closeConnection();
409 }
410 
412  int& nskipped,
413  import_export::CopyParams copy_params) {
414  for (size_t tries = 0; tries < copy_params.retry_count;
415  tries++) { // allow for retries in case of insert failure
416  try {
417  client_->load_table_binary_columnar(session_, table_name_, input_columns_, {});
418  // client->load_table(session, table_name, input_rows);
419  nrows += input_columns_[0].nulls.size();
420  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
421  << std::endl;
422  // we successfully loaded the data, lets move on
423  input_columns_.clear();
424  // create vector for storage of the actual column data
425  for (TColumnType column : row_desc_) {
426  TColumn t;
427  input_columns_.push_back(t);
428  }
429  return;
430  } catch (TOmniSciException& e) {
431  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
432  exit(2);
433  } catch (TException& te) {
434  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
435  wait_disconnect_reconnect_retry(tries, copy_params);
436  }
437  }
438  std::cerr << "Retries exhausted program terminated" << std::endl;
439  exit(1);
440 }
int8_t tinyintval
Definition: sqltypes.h:212
#define LINESTRING
Definition: sqltypes.h:49
TRowDescriptor get_row_descriptor()
SQLTypes
Definition: sqltypes.h:38
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:203
#define SMALLINT
#define DOUBLE
#define BIGINT
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
#define DATE
TRowDescriptor row_desc_
int32_t intval
Definition: sqltypes.h:214
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
#define MULTIPOLYGON
float floatval
Definition: sqltypes.h:216
#define POINT
int64_t bigintval
Definition: sqltypes.h:215
#define TIME
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
int16_t smallintval
Definition: sqltypes.h:213
#define TINYINT
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:275
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)
int get_precision() const
Definition: sqltypes.h:332
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
#define TIMESTAMP
Definition: sqltypes.h:41
std::shared_ptr< TProtocol > get_protocol()
char * t
#define DECIMAL
void createConnection(const ThriftClientConnection &con)
#define POLYGON
Definition: sqltypes.h:45
#define FLOAT
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.
double doubleval
Definition: sqltypes.h:217
std::shared_ptr< OmniSciClient > client_