OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowToColumnLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include "Logger/Logger.h"
27 
28 #include <chrono>
29 #include <thread>
30 
31 using namespace ::apache::thrift;
32 
33 SQLTypes get_sql_types(const TColumnType& ct) {
34  switch (ct.col_type.type) {
35  case TDatumType::BIGINT:
36  return SQLTypes::kBIGINT;
37  case TDatumType::BOOL:
38  return SQLTypes::kBOOLEAN;
39  case TDatumType::DATE:
40  return SQLTypes::kDATE;
41  case TDatumType::DECIMAL:
42  return SQLTypes::kDECIMAL;
43  case TDatumType::DOUBLE:
44  return SQLTypes::kDOUBLE;
45  case TDatumType::FLOAT:
46  return SQLTypes::kFLOAT;
47  case TDatumType::INT:
48  return SQLTypes::kINT;
49  case TDatumType::STR:
50  // Tdataum is lossy here so need to look at precision to see if it was defined
51  if (ct.col_type.precision == 0) {
52  return SQLTypes::kTEXT;
53  } else {
54  return SQLTypes::kVARCHAR;
55  }
56  case TDatumType::TIME:
57  return SQLTypes::kTIME;
58  case TDatumType::TIMESTAMP:
59  return SQLTypes::kTIMESTAMP;
60  case TDatumType::SMALLINT:
61  return SQLTypes::kSMALLINT;
62  case TDatumType::TINYINT:
63  return SQLTypes::kTINYINT;
64  case TDatumType::POINT:
65  return SQLTypes::kPOINT;
66  case TDatumType::MULTIPOINT:
67  return SQLTypes::kMULTIPOINT;
68  case TDatumType::LINESTRING:
69  return SQLTypes::kLINESTRING;
70  case TDatumType::MULTILINESTRING:
72  case TDatumType::POLYGON:
73  return SQLTypes::kPOLYGON;
74  case TDatumType::MULTIPOLYGON:
76  default:
77  LOG(FATAL) << "Unsupported TColumnType found, should not be possible";
78  return SQLTypes::kNULLT; // satisfy return-type warning
79  }
80 }
81 
83  if (ct.col_type.is_array) {
85  ct.col_type.precision,
86  ct.col_type.scale,
87  ct.col_type.nullable,
89  0,
90  get_sql_types(ct));
91  } else {
92  // normal column
93  // NOTE(se)
94  // for geo types, the values inserted for the other fields
95  // may not be valid, but only the type field is ever used
96  return SQLTypeInfo(get_sql_types(ct),
97  ct.col_type.precision,
98  ct.col_type.scale,
99  ct.col_type.nullable,
101  0,
103  }
104 }
105 
106 // this function allows us to treat array columns natively in the rest of the code
107 // by creating fact column description
109  return SQLTypeInfo(get_sql_types(ct),
110  ct.col_type.precision,
111  ct.col_type.scale,
112  ct.col_type.nullable,
114  0,
116 }
117 
119  std::vector<TStringValue> row,
120  const import_export::CopyParams& copy_params) {
121  std::ostringstream out;
122  bool first = true;
123  for (TStringValue ts : row) {
124  if (first) {
125  first = false;
126  } else {
127  out << copy_params.delimiter;
128  }
129  out << ts.str_val;
130  }
131  return out.str();
132 }
133 
134 // remove the entries from a row that has a failure during processing
135 // we must remove the entries that have been pushed onto the input_col so far
136 void remove_partial_row(size_t failed_column,
137  std::vector<SQLTypeInfo> column_type_info_vector,
138  std::vector<TColumn>& input_col_vec) {
139  for (size_t idx = 0; idx < failed_column; idx++) {
140  switch (column_type_info_vector[idx].get_type()) {
141  case SQLTypes::kARRAY:
142  input_col_vec[idx].nulls.pop_back();
143  input_col_vec[idx].data.arr_col.pop_back();
144  break;
145  case SQLTypes::kTEXT:
146  case SQLTypes::kCHAR:
147  case SQLTypes::kVARCHAR:
148  input_col_vec[idx].nulls.pop_back();
149  input_col_vec[idx].data.str_col.pop_back();
150  break;
151  case SQLTypes::kTINYINT:
152  case SQLTypes::kINT:
153  case SQLTypes::kBIGINT:
154  case SQLTypes::kSMALLINT:
155  case SQLTypes::kDATE:
156  case SQLTypes::kTIME:
158  case SQLTypes::kNUMERIC:
159  case SQLTypes::kDECIMAL:
160  case SQLTypes::kBOOLEAN:
161  input_col_vec[idx].nulls.pop_back();
162  input_col_vec[idx].data.int_col.pop_back();
163  break;
164  case SQLTypes::kFLOAT:
165  case SQLTypes::kDOUBLE:
166  input_col_vec[idx].nulls.pop_back();
167  input_col_vec[idx].data.real_col.pop_back();
168  break;
169  case SQLTypes::kPOINT:
173  case SQLTypes::kPOLYGON:
175  input_col_vec[idx].nulls.pop_back();
176  input_col_vec[idx].data.str_col.pop_back();
177  break;
178  default:
179  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
180  }
181  }
182 }
183 
184 void populate_TColumn(TStringValue ts,
185  SQLTypeInfo column_type_info,
186  TColumn& input_col,
187  const import_export::CopyParams& copy_params) {
188  // create datum and push data to column structure from row data
189  switch (column_type_info.get_type()) {
190  case SQLTypes::kARRAY:
191  LOG(FATAL) << "Trying to process ARRAY at item level something is wrong";
192  break;
193  case SQLTypes::kTEXT:
194  case SQLTypes::kCHAR:
195  case SQLTypes::kVARCHAR:
196  case SQLTypes::kPOINT:
200  case SQLTypes::kPOLYGON:
202  if (ts.is_null) {
203  input_col.nulls.push_back(true);
204  input_col.data.str_col.emplace_back("");
205 
206  } else {
207  input_col.nulls.push_back(false);
208  switch (column_type_info.get_type()) {
209  case SQLTypes::kCHAR:
210  case SQLTypes::kVARCHAR:
211  input_col.data.str_col.push_back(
212  ts.str_val.substr(0, column_type_info.get_precision()));
213  break;
214  case SQLTypes::kTEXT:
215  case SQLTypes::kPOINT:
219  case SQLTypes::kPOLYGON:
221  input_col.data.str_col.push_back(ts.str_val);
222  break;
223  default:
224  LOG(FATAL) << " trying to process a STRING transport type not handled "
225  << column_type_info.get_type();
226  }
227  }
228  break;
229  case SQLTypes::kINT:
230  case SQLTypes::kBIGINT:
231  case SQLTypes::kSMALLINT:
232  case SQLTypes::kTINYINT:
233  case SQLTypes::kDATE:
234  case SQLTypes::kTIME:
236  case SQLTypes::kNUMERIC:
237  case SQLTypes::kDECIMAL:
238  case SQLTypes::kBOOLEAN:
239  if (ts.is_null) {
240  input_col.nulls.push_back(true);
241  input_col.data.int_col.push_back(0);
242  } else {
243  input_col.nulls.push_back(false);
244  Datum d = StringToDatum(ts.str_val, column_type_info);
245  switch (column_type_info.get_type()) {
246  case SQLTypes::kINT:
247  case SQLTypes::kBOOLEAN:
248  input_col.data.int_col.push_back(d.intval);
249  break;
250  case SQLTypes::kBIGINT:
251  case SQLTypes::kNUMERIC:
252  case SQLTypes::kDECIMAL:
253  input_col.data.int_col.push_back(d.bigintval);
254  break;
255  case SQLTypes::kSMALLINT:
256  input_col.data.int_col.push_back(d.smallintval);
257  break;
258  case SQLTypes::kTINYINT:
259  input_col.data.int_col.push_back(d.tinyintval);
260  break;
261  case SQLTypes::kDATE:
262  case SQLTypes::kTIME:
264  input_col.data.int_col.push_back(d.bigintval);
265  break;
266  default:
267  LOG(FATAL) << " trying to process an INT transport type not handled "
268  << column_type_info.get_type();
269  }
270  }
271  break;
272  case SQLTypes::kFLOAT:
273  case SQLTypes::kDOUBLE:
274  if (ts.is_null) {
275  input_col.nulls.push_back(true);
276  input_col.data.real_col.push_back(0);
277 
278  } else {
279  input_col.nulls.push_back(false);
280  Datum d = StringToDatum(ts.str_val, column_type_info);
281  switch (column_type_info.get_type()) {
282  case SQLTypes::kFLOAT:
283  input_col.data.real_col.push_back(d.floatval);
284  break;
285  case SQLTypes::kDOUBLE:
286  input_col.data.real_col.push_back(d.doubleval);
287  break;
288  default:
289  LOG(FATAL) << " trying to process a REAL transport type not handled "
290  << column_type_info.get_type();
291  }
292  }
293  break;
294  default:
295  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
296  }
297 }
298 
300  return row_desc_;
301 };
302 
304  std::vector<TStringValue> row,
305  const import_export::CopyParams& copy_params) {
306  // create datum and push data to column structure from row data
307  uint64_t curr_col = 0;
308  for (TStringValue ts : row) {
309  try {
310  switch (column_type_info_[curr_col].get_type()) {
311  case SQLTypes::kARRAY: {
312  std::vector<std::string> arr_ele;
314  ts.str_val, copy_params, arr_ele);
315  TColumn array_tcol;
316  for (std::string item : arr_ele) {
317  boost::algorithm::trim(item);
318  TStringValue tsa;
319  tsa.str_val = item;
320  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
321  // now put into TColumn
323  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
324  }
325  input_columns_[curr_col].nulls.push_back(false);
326  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
327 
328  break;
329  }
330  default:
332  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
333  }
334  } catch (const std::exception& e) {
336  // import_status.rows_rejected++;
337  LOG(ERROR) << "Input exception thrown: " << e.what()
338  << ". Row discarded, issue at column : " << (curr_col + 1)
339  << " data :" << print_row_with_delim(row, copy_params);
340  return false;
341  }
342  curr_col++;
343  }
344  return true;
345 }
346 
348  const std::string& user_name,
349  const std::string& passwd,
350  const std::string& db_name,
351  const std::string& table_name)
352  : user_name_(user_name)
353  , passwd_(passwd)
354  , db_name_(db_name)
355  , table_name_(table_name)
356  , conn_details_(conn_details) {
358 
359  TTableDetails table_details;
360  client_->get_table_details(table_details, session_, table_name_);
361 
362  row_desc_ = table_details.row_desc;
363 
364  // create vector with column details
365  for (TColumnType ct : row_desc_) {
367  }
368 
369  // create vector with array column details presented as real column for easier resue
370  // of othe code
371  for (TColumnType ct : row_desc_) {
373  }
374 
375  // create vector for storage of the actual column data
376  for (TColumnType column : row_desc_) {
377  TColumn t;
378  input_columns_.push_back(t);
379  }
380 }
382  closeConnection();
383 }
384 
386  client_.reset(new HeavyClient(conn_details_.get_protocol()));
387 
388  try {
390  } catch (TDBException& e) {
391  std::cerr << e.error_msg << std::endl;
392  } catch (TException& te) {
393  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
394  }
395 }
396 
398  try {
399  client_->disconnect(session_); // disconnect from heavydb
400  } catch (TDBException& e) {
401  std::cerr << e.error_msg << std::endl;
402  } catch (TException& te) {
403  std::cerr << "Thrift error on close: " << te.what() << std::endl;
404  }
405 }
406 
408  size_t tries,
409  import_export::CopyParams copy_params) {
410  std::cout << " Waiting " << copy_params.retry_wait
411  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
412  << " times more " << std::endl;
413  std::this_thread::sleep_for(std::chrono::seconds(copy_params.retry_wait));
414 
415  closeConnection();
417 }
418 
420  int& nskipped,
421  import_export::CopyParams copy_params) {
422  for (size_t tries = 0; tries < copy_params.retry_count;
423  tries++) { // allow for retries in case of insert failure
424  try {
425  client_->load_table_binary_columnar(session_, table_name_, input_columns_, {});
426  // client->load_table(session, table_name, input_rows);
427  nrows += input_columns_[0].nulls.size();
428  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
429  << std::endl;
430  // we successfully loaded the data, lets move on
431  input_columns_.clear();
432  // create vector for storage of the actual column data
433  for (TColumnType column : row_desc_) {
434  TColumn t;
435  input_columns_.push_back(t);
436  }
437  return;
438  } catch (TDBException& e) {
439  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
440  exit(2);
441  } catch (TException& te) {
442  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
443  wait_disconnect_reconnect_retry(tries, copy_params);
444  }
445  }
446  std::cerr << "Retries exhausted program terminated" << std::endl;
447  exit(1);
448 }
int8_t tinyintval
Definition: Datum.h:73
Definition: sqltypes.h:76
TRowDescriptor get_row_descriptor()
SQLTypes
Definition: sqltypes.h:65
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:285
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
std::shared_ptr< HeavyClient > client_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
TRowDescriptor row_desc_
int32_t intval
Definition: Datum.h:75
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams &copy_params)
std::vector< SQLTypeInfo > array_column_type_info_
float floatval
Definition: Datum.h:77
int64_t bigintval
Definition: Datum.h:76
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams &copy_params)
int16_t smallintval
Definition: Datum.h:74
void parse_string_array(const std::string &s, const import_export::CopyParams &copy_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
Datum StringToDatum(const std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:339
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)
int get_precision() const
Definition: sqltypes.h:394
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:79
Definition: sqltypes.h:80
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:68
std::shared_ptr< TProtocol > get_protocol()
void createConnection(const ThriftClientConnection &con)
Definition: sqltypes.h:72
Definition: Datum.h:71
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
double doubleval
Definition: Datum.h:78