OmniSciDB  04ee39c94c
RowToColumnLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include "RowToColumnLoader.h"
27 #include "Shared/Logger.h"
28 
29 using namespace ::apache::thrift;
30 
31 SQLTypes get_sql_types(const TColumnType& ct) {
32  switch (ct.col_type.type) {
33  case TDatumType::BIGINT:
34  return SQLTypes::kBIGINT;
35  case TDatumType::BOOL:
36  return SQLTypes::kBOOLEAN;
37  case TDatumType::DATE:
38  return SQLTypes::kDATE;
39  case TDatumType::DECIMAL:
40  return SQLTypes::kDECIMAL;
41  case TDatumType::DOUBLE:
42  return SQLTypes::kDOUBLE;
43  case TDatumType::FLOAT:
44  return SQLTypes::kFLOAT;
45  case TDatumType::INT:
46  return SQLTypes::kINT;
47  case TDatumType::STR:
48  // Tdataum is lossy here so need to look at precision to see if it was defined
49  if (ct.col_type.precision == 0) {
50  return SQLTypes::kTEXT;
51  } else {
52  return SQLTypes::kVARCHAR;
53  }
54  case TDatumType::TIME:
55  return SQLTypes::kTIME;
56  case TDatumType::TIMESTAMP:
57  return SQLTypes::kTIMESTAMP;
58  case TDatumType::SMALLINT:
59  return SQLTypes::kSMALLINT;
60  default:
61  LOG(FATAL) << "Unsupported TColumnType found, should not be possible";
62  return SQLTypes::kNULLT; // satisfy return-type warning
63  }
64 }
65 
67  if (ct.col_type.is_array) {
69  ct.col_type.precision,
70  ct.col_type.scale,
71  ct.col_type.nullable,
73  0,
74  get_sql_types(ct));
75  } else {
76  // normal column
77  return SQLTypeInfo(get_sql_types(ct),
78  ct.col_type.precision,
79  ct.col_type.scale,
80  ct.col_type.nullable,
82  0,
84  }
85 }
86 
87 // this function allows us to treat array columns natively in the rest of the code
88 // by creating fact column description
90  return SQLTypeInfo(get_sql_types(ct),
91  ct.col_type.precision,
92  ct.col_type.scale,
93  ct.col_type.nullable,
95  0,
97 }
98 
100  std::vector<TStringValue> row,
101  const Importer_NS::CopyParams& copy_params) {
102  std::ostringstream out;
103  bool first = true;
104  for (TStringValue ts : row) {
105  if (first) {
106  first = false;
107  } else {
108  out << copy_params.delimiter;
109  }
110  out << ts.str_val;
111  }
112  return out.str();
113 }
114 
115 // remove the entries from a row that has a failure during processing
116 // we must remove the entries that have been pushed onto the input_col so far
117 void remove_partial_row(size_t failed_column,
118  std::vector<SQLTypeInfo> column_type_info_vector,
119  std::vector<TColumn>& input_col_vec) {
120  for (size_t idx = 0; idx < failed_column; idx++) {
121  switch (column_type_info_vector[idx].get_type()) {
122  case SQLTypes::kARRAY:
123  input_col_vec[idx].nulls.pop_back();
124  input_col_vec[idx].data.arr_col.pop_back();
125  break;
126  case SQLTypes::kTEXT:
127  case SQLTypes::kCHAR:
128  case SQLTypes::kVARCHAR:
129  input_col_vec[idx].nulls.pop_back();
130  input_col_vec[idx].data.str_col.pop_back();
131  break;
132  case SQLTypes::kINT:
133  case SQLTypes::kBIGINT:
134  case SQLTypes::kSMALLINT:
135  case SQLTypes::kDATE:
136  case SQLTypes::kTIME:
138  case SQLTypes::kNUMERIC:
139  case SQLTypes::kDECIMAL:
140  case SQLTypes::kBOOLEAN:
141  input_col_vec[idx].nulls.pop_back();
142  input_col_vec[idx].data.int_col.pop_back();
143  break;
144  case SQLTypes::kFLOAT:
145  case SQLTypes::kDOUBLE:
146  input_col_vec[idx].nulls.pop_back();
147  input_col_vec[idx].data.real_col.pop_back();
148  break;
149  default:
150  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
151  }
152  }
153 }
154 
155 void populate_TColumn(TStringValue ts,
156  SQLTypeInfo column_type_info,
157  TColumn& input_col,
158  const Importer_NS::CopyParams& copy_params) {
159  // create datum and push data to column structure from row data
160  switch (column_type_info.get_type()) {
161  case SQLTypes::kARRAY:
162  LOG(FATAL) << "Trying to process ARRAY at item level something is wrong";
163  break;
164  case SQLTypes::kTEXT:
165  case SQLTypes::kCHAR:
166  case SQLTypes::kVARCHAR:
167  if (ts.is_null) {
168  input_col.nulls.push_back(true);
169  input_col.data.str_col.emplace_back("");
170 
171  } else {
172  input_col.nulls.push_back(false);
173  switch (column_type_info.get_type()) {
174  case SQLTypes::kCHAR:
175  case SQLTypes::kVARCHAR:
176  input_col.data.str_col.push_back(
177  ts.str_val.substr(0, column_type_info.get_precision()));
178  break;
179  case SQLTypes::kTEXT:
180 
181  input_col.data.str_col.push_back(ts.str_val);
182  break;
183  default:
184  LOG(FATAL) << " trying to process a STRING transport type not handled "
185  << column_type_info.get_type();
186  }
187  }
188  break;
189  case SQLTypes::kINT:
190  case SQLTypes::kBIGINT:
191  case SQLTypes::kSMALLINT:
192  case SQLTypes::kDATE:
193  case SQLTypes::kTIME:
195  case SQLTypes::kNUMERIC:
196  case SQLTypes::kDECIMAL:
197  case SQLTypes::kBOOLEAN:
198  if (ts.is_null) {
199  input_col.nulls.push_back(true);
200  input_col.data.int_col.push_back(0);
201  } else {
202  input_col.nulls.push_back(false);
203  Datum d = StringToDatum(ts.str_val, column_type_info);
204  switch (column_type_info.get_type()) {
205  case SQLTypes::kINT:
206  case SQLTypes::kBOOLEAN:
207  input_col.data.int_col.push_back(d.intval);
208  break;
209  case SQLTypes::kBIGINT:
210  case SQLTypes::kNUMERIC:
211  case SQLTypes::kDECIMAL:
212  input_col.data.int_col.push_back(d.bigintval);
213  break;
214  case SQLTypes::kSMALLINT:
215  input_col.data.int_col.push_back(d.smallintval);
216  break;
217  case SQLTypes::kDATE:
218  case SQLTypes::kTIME:
220  input_col.data.int_col.push_back(d.bigintval);
221  break;
222  default:
223  LOG(FATAL) << " trying to process an INT transport type not handled "
224  << column_type_info.get_type();
225  }
226  }
227  break;
228  case SQLTypes::kFLOAT:
229  case SQLTypes::kDOUBLE:
230  if (ts.is_null) {
231  input_col.nulls.push_back(true);
232  input_col.data.real_col.push_back(0);
233 
234  } else {
235  input_col.nulls.push_back(false);
236  Datum d = StringToDatum(ts.str_val, column_type_info);
237  switch (column_type_info.get_type()) {
238  case SQLTypes::kFLOAT:
239  input_col.data.real_col.push_back(d.floatval);
240  break;
241  case SQLTypes::kDOUBLE:
242  input_col.data.real_col.push_back(d.doubleval);
243  break;
244  default:
245  LOG(FATAL) << " trying to process a REAL transport type not handled "
246  << column_type_info.get_type();
247  }
248  }
249  break;
250  default:
251  LOG(FATAL) << "Trying to process an unsupported datatype, should be impossible";
252  }
253 }
254 
256  return row_desc_;
257 };
258 
260  std::vector<TStringValue> row,
261  const Importer_NS::CopyParams& copy_params) {
262  // create datum and push data to column structure from row data
263  uint curr_col = 0;
264  for (TStringValue ts : row) {
265  try {
266  switch (column_type_info_[curr_col].get_type()) {
267  case SQLTypes::kARRAY: {
268  std::vector<std::string> arr_ele;
269  Importer_NS::ImporterUtils::parseStringArray(ts.str_val, copy_params, arr_ele);
270  TColumn array_tcol;
271  for (std::string item : arr_ele) {
272  boost::algorithm::trim(item);
273  TStringValue tsa;
274  tsa.str_val = item;
275  tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.null_str);
276  // now put into TColumn
278  tsa, array_column_type_info_[curr_col], array_tcol, copy_params);
279  }
280  input_columns_[curr_col].nulls.push_back(false);
281  input_columns_[curr_col].data.arr_col.push_back(array_tcol);
282 
283  break;
284  }
285  default:
287  ts, column_type_info_[curr_col], input_columns_[curr_col], copy_params);
288  }
289  } catch (const std::exception& e) {
291  // import_status.rows_rejected++;
292  LOG(ERROR) << "Input exception thrown: " << e.what()
293  << ". Row discarded, issue at column : " << (curr_col + 1)
294  << " data :" << print_row_with_delim(row, copy_params);
295  return false;
296  }
297  curr_col++;
298  }
299  return true;
300 }
301 
303  const std::string& user_name,
304  const std::string& passwd,
305  const std::string& db_name,
306  const std::string& table_name)
307  : user_name_(user_name)
308  , passwd_(passwd)
309  , db_name_(db_name)
310  , table_name_(table_name)
311  , conn_details_(conn_details) {
313 
314  TTableDetails table_details;
315  client_->get_table_details(table_details, session_, table_name_);
316 
317  row_desc_ = table_details.row_desc;
318 
319  // create vector with column details
320  for (TColumnType ct : row_desc_) {
322  }
323 
324  // create vector with array column details presented as real column for easier resue
325  // of othe code
326  for (TColumnType ct : row_desc_) {
328  }
329 
330  // create vector for storage of the actual column data
331  for (TColumnType column : row_desc_) {
332  TColumn t;
333  input_columns_.push_back(t);
334  }
335 }
337  closeConnection();
338 }
339 
341  client_.reset(new MapDClient(conn_details_.get_protocol()));
342 
343  try {
345  } catch (TMapDException& e) {
346  std::cerr << e.error_msg << std::endl;
347  } catch (TException& te) {
348  std::cerr << "Thrift error on connect: " << te.what() << std::endl;
349  }
350 }
351 
353  try {
354  client_->disconnect(session_); // disconnect from omnisci_server
355  } catch (TMapDException& e) {
356  std::cerr << e.error_msg << std::endl;
357  } catch (TException& te) {
358  std::cerr << "Thrift error on close: " << te.what() << std::endl;
359  }
360 }
361 
363  size_t tries,
364  Importer_NS::CopyParams copy_params) {
365  std::cout << " Waiting " << copy_params.retry_wait
366  << " secs to retry Inserts , will try " << (copy_params.retry_count - tries)
367  << " times more " << std::endl;
368  sleep(copy_params.retry_wait);
369 
370  closeConnection();
372 }
373 
375  int& nskipped,
376  Importer_NS::CopyParams copy_params) {
377  for (size_t tries = 0; tries < copy_params.retry_count;
378  tries++) { // allow for retries in case of insert failure
379  try {
380  client_->load_table_binary_columnar(session_, table_name_, input_columns_);
381  // client->load_table(session, table_name, input_rows);
382  nrows += input_columns_[0].nulls.size();
383  std::cout << nrows << " Rows Inserted, " << nskipped << " rows skipped."
384  << std::endl;
385  // we successfully loaded the data, lets move on
386  input_columns_.clear();
387  // create vector for storage of the actual column data
388  for (TColumnType column : row_desc_) {
389  TColumn t;
390  input_columns_.push_back(t);
391  }
392  return;
393  } catch (TMapDException& e) {
394  std::cerr << "Exception trying to insert data " << e.error_msg << std::endl;
395  wait_disconnet_reconnnect_retry(tries, copy_params);
396  } catch (TException& te) {
397  std::cerr << "Exception trying to insert data " << te.what() << std::endl;
398  wait_disconnet_reconnnect_retry(tries, copy_params);
399  }
400  }
401  std::cerr << "Retries exhausted program terminated" << std::endl;
402  exit(1);
403 }
mapd::shared_ptr< MapDClient > client_
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
int get_precision() const
Definition: sqltypes.h:326
std::string null_str
Definition: Importer.h:100
Definition: sqltypes.h:51
TRowDescriptor get_row_descriptor()
SQLTypes
Definition: sqltypes.h:40
void do_load(int &nrows, int &nskipped, Importer_NS::CopyParams copy_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
#define LOG(tag)
Definition: Logger.h:182
bool convert_string_to_column(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const Importer_NS::CopyParams &copy_params)
std::string print_row_with_delim(std::vector< TStringValue > row, const Importer_NS::CopyParams &copy_params)
std::string table_name_
std::vector< SQLTypeInfo > column_type_info_
std::string user_name_
TRowDescriptor row_desc_
int32_t intval
Definition: sqltypes.h:125
std::vector< TColumn > input_columns_
std::vector< SQLTypeInfo > array_column_type_info_
float floatval
Definition: sqltypes.h:127
mapd::shared_ptr< TProtocol > get_protocol()
int64_t bigintval
Definition: sqltypes.h:126
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
int16_t smallintval
Definition: sqltypes.h:124
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:823
static bool parseStringArray(const std::string &s, const CopyParams &copy_params, std::vector< std::string > &string_vec)
Definition: Importer.h:814
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
Definition: sqltypes.h:54
Definition: sqltypes.h:55
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
Datum StringToDatum(const std::string &s, SQLTypeInfo &ti)
Definition: Datum.cpp:90
Definition: sqltypes.h:43
void wait_disconnet_reconnnect_retry(size_t tries, Importer_NS::CopyParams copy_params)
void createConnection(const ThriftClientConnection &con)
Definition: sqltypes.h:47
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)
double doubleval
Definition: sqltypes.h:128