OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowToColumnLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
28 #include "Shared/Logger.h"
29 
30 using namespace ::apache::thrift;
31 
32 SQLTypes get_sql_types(const TColumnType& ct) {
33  switch (ct.col_type.type) {
34  case TDatumType::BIGINT:
35  return SQLTypes::kBIGINT;
36  case TDatumType::BOOL:
37  return SQLTypes::kBOOLEAN;
38  case TDatumType::DATE:
39  return SQLTypes::kDATE;
40  case TDatumType::DECIMAL:
41  return SQLTypes::kDECIMAL;
42  case TDatumType::DOUBLE:
43  return SQLTypes::kDOUBLE;
44  case TDatumType::FLOAT:
45  return SQLTypes::kFLOAT;
46  case TDatumType::INT:
47  return SQLTypes::kINT;
48  case TDatumType::STR:
49  // Tdataum is lossy here so need to look at precision to see if it was defined
50  if (ct.col_type.precision == 0) {
51  return SQLTypes::kTEXT;
52  } else {
53  return SQLTypes::kVARCHAR;
54  }
55  case TDatumType::TIME:
56  return SQLTypes::kTIME;
57  case TDatumType::TIMESTAMP:
58  return SQLTypes::kTIMESTAMP;
59  case TDatumType::SMALLINT:
60  return SQLTypes::kSMALLINT;
61  case TDatumType::POINT:
62  return SQLTypes::kPOINT;
63  case TDatumType::LINESTRING:
64  return SQLTypes::kLINESTRING;
65  case TDatumType::POLYGON:
66  return SQLTypes::kPOLYGON;
67  case TDatumType::MULTIPOLYGON:
69  default:
70  LOG(FATAL) << "Unsupported TColumnType found, should not be possible";
71  return SQLTypes::kNULLT; // satisfy return-type warning
72  }
73 }
74 
76  if (ct.col_type.is_array) {
78  ct.col_type.precision,
79  ct.col_type.scale,
80  ct.col_type.nullable,
82  0,
83  get_sql_types(ct));
84  } else {
85  // normal column
86  // NOTE(se)
87  // for geo types, the values inserted for the other fields
88  // may not be valid, but only the type field is ever used
89  return SQLTypeInfo(get_sql_types(ct),
90  ct.col_type.precision,
91  ct.col_type.scale,
92  ct.col_type.nullable,
94  0,
96  }
97 }
98 
99 // this function allows us to treat array columns natively in the rest of the code
100 // by creating fact column description
102  return SQLTypeInfo(get_sql_types(ct),
103  ct.col_type.precision,
104  ct.col_type.scale,
105  ct.col_type.nullable,
107  0,
109 }
110 
112  std::vector<TStringValue> row,
113  const Importer_NS::CopyParams& copy_params) {
114  std::ostringstream out;
115  bool first = true;
116  for (TStringValue ts : row) {
117  if (first) {
118  first = false;
119  } else {
120  out << copy_params.delimiter;
121  }
122  out << ts.str_val;
123  }
124  return out.str();
125 }
126 
127 // remove the entries from a row that has a failure during processing
128 // we must remove the entries that have been pushed onto the input_col so far
129 void remove_partial_row(size_t failed_column,
130  std::vector<SQLTypeInfo> column_type_info_vector,
131  std::vector<TColumn>& input_col_vec) {
132  for (size_t idx = 0; idx < failed_column; idx++) {
133  switch (column_type_info_vector[idx].get_type()) {
134  case SQLTypes::kARRAY:
135  input_col_vec[idx].nulls.pop_back();
136  input_col_vec[idx].data.arr_col.pop_back();
137  break;
138  case SQLTypes::kTEXT:
139  case SQLTypes::kCHAR:
140  case SQLTypes::kVARCHAR:
141  input_col_vec[idx].nulls.pop_back();
142  input_col_vec[idx].data.str_col.pop_back();
143  break;
144  case SQLTypes::kINT:
145  case SQLTypes::kBIGINT:
146  case SQLTypes::kSMALLINT:
147  case SQLTypes::kDATE:
148  case SQLTypes::kTIME:
150  case SQLTypes::kNUMERIC:
151  case SQLTypes::kDECIMAL:
152  case SQLTypes::kBOOLEAN:
153  input_col_vec[idx].nulls.pop_back();
154  input_col_vec[idx].data.int_col.pop_back();
155  break;
156  case SQLTypes::kFLOAT:
157  case SQLTypes::kDOUBLE:
158  input_col_vec[idx].nulls.pop_back();
159  input_col_vec[idx].data.real_col.pop_back();
160  break;
161  case SQLTypes::kPOINT:
163  case SQLTypes::kPOLYGON:
165  input_col_vec[idx].nulls.pop_back();
166  input_col_vec[idx].data.str_col.pop_back();
167  break;
168  default:
169  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
170  }
171  }
172 }
173 
174 void populate_TColumn(TStringValue ts,
175  SQLTypeInfo column_type_info,
176  TColumn& input_col,
177  const Importer_NS::CopyParams& copy_params) {
178  // create datum and push data to column structure from row data
179  switch (column_type_info.get_type()) {
180  case SQLTypes::kARRAY:
181  LOG(FATAL) << "Trying to process ARRAY at item level something is wrong";
182  break;
183  case SQLTypes::kTEXT:
184  case SQLTypes::kCHAR:
185  case SQLTypes::kVARCHAR:
186  case SQLTypes::kPOINT:
188  case SQLTypes::kPOLYGON:
190  if (ts.is_null) {
191  input_col.nulls.push_back(true);
192  input_col.data.str_col.emplace_back("");
193 
194  } else {
195  input_col.nulls.push_back(false);
196  switch (column_type_info.get_type()) {
197  case SQLTypes::kCHAR:
198  case SQLTypes::kVARCHAR:
199  input_col.data.str_col.push_back(
200  ts.str_val.substr(0, column_type_info.get_precision()));
201  break;
202  case SQLTypes::kTEXT:
203  case SQLTypes::kPOINT:
205  case SQLTypes::kPOLYGON:
207  input_col.data.str_col.push_back(ts.str_val);
208  break;
209  default:
210  LOG(FATAL) << " trying to process a STRING transport type not handled "
211  << column_type_info.get_type();
212  }
213  }
214  break;
215  case SQLTypes::kINT:
216  case SQLTypes::kBIGINT:
217  case SQLTypes::kSMALLINT:
218  case SQLTypes::kDATE:
219  case SQLTypes::kTIME:
221  case SQLTypes::kNUMERIC:
222  case SQLTypes::kDECIMAL:
223  case SQLTypes::kBOOLEAN:
224  if (ts.is_null) {
225  input_col.nulls.push_back(true);
226  input_col.data.int_col.push_back(0);
227  } else {
228  input_col.nulls.push_back(false);
229  Datum d = StringToDatum(ts.str_val, column_type_info);
230  switch (column_type_info.get_type()) {
231  case SQLTypes::kINT:
232  case SQLTypes::kBOOLEAN:
233  input_col.data.int_col.push_back(d.intval);
234  break;
235  case SQLTypes::kBIGINT:
236  case SQLTypes::kNUMERIC:
237  case SQLTypes::kDECIMAL:
238  input_col.data.int_col.push_back(d.bigintval);
239  break;
240  case SQLTypes::kSMALLINT:
241  input_col.data.int_col.push_back(d.smallintval);
242  break;
243  case SQLTypes::kDATE:
244  case SQLTypes::kTIME:
246  input_col.data.int_col.push_back(d.bigintval);
247  break;
248  default:
249  LOG(FATAL) << " trying to process an INT transport type not handled "
250  << column_type_info.get_type();
251  }
252  }
253  break;
254  case SQLTypes::kFLOAT:
255  case SQLTypes::kDOUBLE:
256  if (ts.is_null) {
257  input_col.nulls.push_back(true);
258  input_col.data.real_col.push_back(0);
259 
260  } else {
261  input_col.nulls.push_back(false);
262  Datum d = StringToDatum(ts.str_val, column_type_info);
263  switch (column_type_info.get_type()) {
264  case SQLTypes::kFLOAT:
265  input_col.data.real_col.push_back(d.floatval);
266  break;
267  case SQLTypes::kDOUBLE:
268  input_col.data.real_col.push_back(d.doubleval);
269  break;
270  default:
271  LOG(FATAL) << " trying to process a REAL transport type not handled "
272  << column_type_info.get_type();
273  }
274  }
275  break;
276  default:
277  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
278  }
279 }
280 
282  return row_desc_;
283 };
284 
286  std::vector<TStringValue> row,
287  const Importer_NS::CopyParams& copy_params) {
288  // create datum and push data to column structure from row data
289  uint curr_col = 0;
290  for (TStringValue ts : row) {
291  try {
292  switch (column_type_info_[curr_col].get_type()) {
293  case SQLTypes::kARRAY: {
294  std::vector<std::string> arr_ele;
296  ts.str_val, copy_params, arr_ele);
297  TColumn array_tcol;
298  for (std::string item : arr_ele) {
299  boost::algorithm::trim(item);
300  TStringValue tsa;
301  tsa.str_val = item;
302  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
303  // now put into TColumn
305  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
306  }
307  input_columns_[curr_col].nulls.push_back(false);
308  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
309 
310  break;
311  }
312  default:
314  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
315  }
316  } catch (const std::exception& e) {
318  // import_status.rows_rejected++;
319  LOG(ERROR) << "Input exception thrown: " << e.what()
320  << ". Row discarded, issue at column : " << (curr_col + 1)
321  << " data :" << print_row_with_delim(row, copy_params);
322  return false;
323  }
324  curr_col++;
325  }
326  return true;
327 }
328 
330  const std::string& user_name,
331  const std::string& passwd,
332  const std::string& db_name,
333  const std::string& table_name)
334  : user_name_(user_name)
335  , passwd_(passwd)
336  , db_name_(db_name)
337  , table_name_(table_name)
338  , conn_details_(conn_details) {
340 
341  TTableDetails table_details;
342  client_->get_table_details(table_details, session_, table_name_);
343 
344  row_desc_ = table_details.row_desc;
345 
346  // create vector with column details
347  for (TColumnType ct : row_desc_) {
349  }
350 
351  // create vector with array column details presented as real column for easier resue
352  // of othe code
353  for (TColumnType ct : row_desc_) {
355  }
356 
357  // create vector for storage of the actual column data
358  for (TColumnType column : row_desc_) {
359  TColumn t;
360  input_columns_.push_back(t);
361  }
362 }
364  closeConnection();
365 }
366 
368  client_.reset(new MapDClient(conn_details_.get_protocol()));
369 
370  try {
372  } catch (TMapDException& e) {
373  std::cerr << e.error_msg << std::endl;
374  } catch (TException& te) {
375  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
376  }
377 }
378 
380  try {
381  client_->disconnect(session_); // disconnect from omnisci_server
382  } catch (TMapDException& e) {
383  std::cerr << e.error_msg << std::endl;
384  } catch (TException& te) {
385  std::cerr << "Thrift error on close: " << te.what() << std::endl;
386  }
387 }
388 
390  size_t tries,
391  Importer_NS::CopyParams copy_params) {
392  std::cout << " Waiting " << copy_params.retry_wait
393  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
394  << " times more " << std::endl;
395  sleep(copy_params.retry_wait);
396 
397  closeConnection();
399 }
400 
402  int& nskipped,
403  Importer_NS::CopyParams copy_params) {
404  for (size_t tries = 0; tries < copy_params.retry_count;
405  tries++) { // allow for retries in case of insert failure
406  try {
407  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
408  // client->load_table(session, table_name, input_rows);
409  nrows += input_columns_[0].nulls.size();
410  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
411  << std::endl;
412  // we successfully loaded the data, lets move on
413  input_columns_.clear();
414  // create vector for storage of the actual column data
415  for (TColumnType column : row_desc_) {
416  TColumn t;
417  input_columns_.push_back(t);
418  }
419  return;
420  } catch (TMapDException& e) {
421  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
422  wait_disconnet_reconnnect_retry(tries, copy_params);
423  } catch (TException& te) {
424  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
425  wait_disconnet_reconnnect_retry(tries, copy_params);
426  }
427  }
428  std::cerr << "Retries exhausted program terminated" << std::endl;
429  exit(1);
430 }
mapd::shared_ptr< MapDClient > client_
std::string null_str
Definition: CopyParams.h:47
Definition: sqltypes.h:52
TRowDescriptor get_row_descriptor()
SQLTypes
Definition: sqltypes.h:41
void do_load(int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:185
bool convert_string_to_column(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
int32_t intval
Definition: sqltypes.h:128
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
float floatval
Definition: sqltypes.h:130
mapd::shared_ptr< TProtocol > get_protocol()
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
int64_t bigintval
Definition: sqltypes.h:129
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
int16_t smallintval
Definition: sqltypes.h:127
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:55
Definition: sqltypes.h:56
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
Datum StringToDatum(const std::string &s, SQLTypeInfo &ti)
Definition: Datum.cpp:90
Definition: sqltypes.h:44
void wait_disconnet_reconnnect_retry(size_t tries, Importer_NS::CopyParams copy_params)
void createConnection(const ThriftClientConnection &con)
int get_precision() const
Definition: sqltypes.h:329
Definition: sqltypes.h:48
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
static void parseStringArray(const std::string &s, const Importer_NS::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
double doubleval
Definition: sqltypes.h:131