OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.h
19  * @author Wei Hong < wei@mapd.com>
20  * @brief Importer class for table import from file
21  */
22 #ifndef _IMPORTER_H_
23 #define _IMPORTER_H_
24 
25 #include <gdal.h>
26 #include <ogrsf_frmts.h>
27 
28 #include <atomic>
29 #include <boost/filesystem.hpp>
30 #include <boost/noncopyable.hpp>
31 #include <boost/tokenizer.hpp>
32 #include <condition_variable>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <iostream>
36 #include <list>
37 #include <map>
38 #include <memory>
39 #include <mutex>
40 #include <set>
41 #include <string>
42 #include <string_view>
43 #include <utility>
44 
45 #include "AbstractImporter.h"
46 #include "Catalog/Catalog.h"
48 #include "DataMgr/Chunk/Chunk.h"
49 #include "Fragmenter/Fragmenter.h"
51 #include "Logger/Logger.h"
53 #include "Shared/checked_alloc.h"
54 #include "Shared/fixautotools.h"
55 
56 // Some builds of boost::geometry require iostream, but don't explicitly include it.
57 // Placing in own section to ensure it's included after iostream.
58 #include <boost/geometry/index/rtree.hpp>
59 
60 class TDatum;
61 class TColumn;
62 
63 namespace arrow {
64 
65 class Array;
66 
67 } // namespace arrow
68 
69 namespace import_export {
70 
71 class Importer;
72 
73 using ArraySliceRange = std::pair<size_t, size_t>;
74 
76  std::mutex mutex;
77  std::set<int64_t> rows;
78  std::atomic<int> nerrors;
79  std::string file_name;
80  int row_group;
82 };
83 
85  public:
86  static ArrayDatum composeNullArray(const SQLTypeInfo& ti);
87  static ArrayDatum composeNullPointCoords(const SQLTypeInfo& coords_ti,
88  const SQLTypeInfo& geo_ti);
89 };
90 
91 class TypedImportBuffer : boost::noncopyable {
92  public:
93  using OptionalStringVector = std::optional<std::vector<std::string>>;
94  TypedImportBuffer(const ColumnDescriptor* col_desc, StringDictionary* string_dict)
95  : column_desc_(col_desc), string_dict_(string_dict) {
96  switch (col_desc->columnType.get_type()) {
97  case kBOOLEAN:
98  bool_buffer_ = new std::vector<int8_t>();
99  break;
100  case kTINYINT:
101  tinyint_buffer_ = new std::vector<int8_t>();
102  break;
103  case kSMALLINT:
104  smallint_buffer_ = new std::vector<int16_t>();
105  break;
106  case kINT:
107  int_buffer_ = new std::vector<int32_t>();
108  break;
109  case kBIGINT:
110  case kNUMERIC:
111  case kDECIMAL:
112  bigint_buffer_ = new std::vector<int64_t>();
113  break;
114  case kFLOAT:
115  float_buffer_ = new std::vector<float>();
116  break;
117  case kDOUBLE:
118  double_buffer_ = new std::vector<double>();
119  break;
120  case kTEXT:
121  case kVARCHAR:
122  case kCHAR:
123  string_buffer_ = new std::vector<std::string>();
124  if (col_desc->columnType.get_compression() == kENCODING_DICT) {
125  switch (col_desc->columnType.get_size()) {
126  case 1:
127  string_dict_i8_buffer_ = new std::vector<uint8_t>();
128  break;
129  case 2:
130  string_dict_i16_buffer_ = new std::vector<uint16_t>();
131  break;
132  case 4:
133  string_dict_i32_buffer_ = new std::vector<int32_t>();
134  break;
135  default:
136  CHECK(false);
137  }
138  }
139  break;
140  case kDATE:
141  case kTIME:
142  case kTIMESTAMP:
143  bigint_buffer_ = new std::vector<int64_t>();
144  break;
145  case kARRAY:
146  if (IS_STRING(col_desc->columnType.get_subtype())) {
148  string_array_buffer_ = new std::vector<OptionalStringVector>();
149  string_array_dict_buffer_ = new std::vector<ArrayDatum>();
150  } else {
151  array_buffer_ = new std::vector<ArrayDatum>();
152  }
153  break;
154  case kPOINT:
155  case kLINESTRING:
156  case kPOLYGON:
157  case kMULTIPOLYGON:
158  geo_string_buffer_ = new std::vector<std::string>();
159  break;
160  default:
161  CHECK(false);
162  }
163  }
164 
166  switch (column_desc_->columnType.get_type()) {
167  case kBOOLEAN:
168  delete bool_buffer_;
169  break;
170  case kTINYINT:
171  delete tinyint_buffer_;
172  break;
173  case kSMALLINT:
174  delete smallint_buffer_;
175  break;
176  case kINT:
177  delete int_buffer_;
178  break;
179  case kBIGINT:
180  case kNUMERIC:
181  case kDECIMAL:
182  delete bigint_buffer_;
183  break;
184  case kFLOAT:
185  delete float_buffer_;
186  break;
187  case kDOUBLE:
188  delete double_buffer_;
189  break;
190  case kTEXT:
191  case kVARCHAR:
192  case kCHAR:
193  delete string_buffer_;
195  switch (column_desc_->columnType.get_size()) {
196  case 1:
197  delete string_dict_i8_buffer_;
198  break;
199  case 2:
201  break;
202  case 4:
204  break;
205  }
206  }
207  break;
208  case kDATE:
209  case kTIME:
210  case kTIMESTAMP:
211  delete bigint_buffer_;
212  break;
213  case kARRAY:
215  delete string_array_buffer_;
217  } else {
218  delete array_buffer_;
219  }
220  break;
221  case kPOINT:
222  case kLINESTRING:
223  case kPOLYGON:
224  case kMULTIPOLYGON:
225  delete geo_string_buffer_;
226  break;
227  default:
228  CHECK(false);
229  }
230  }
231 
232  void addBoolean(const int8_t v) { bool_buffer_->push_back(v); }
233 
234  void addTinyint(const int8_t v) { tinyint_buffer_->push_back(v); }
235 
236  void addSmallint(const int16_t v) { smallint_buffer_->push_back(v); }
237 
238  void addInt(const int32_t v) { int_buffer_->push_back(v); }
239 
240  void addBigint(const int64_t v) { bigint_buffer_->push_back(v); }
241 
242  void addFloat(const float v) { float_buffer_->push_back(v); }
243 
244  void addDouble(const double v) { double_buffer_->push_back(v); }
245 
246  void addString(const std::string_view v) { string_buffer_->emplace_back(v); }
247 
248  void addGeoString(const std::string_view v) { geo_string_buffer_->emplace_back(v); }
249 
250  void addArray(const ArrayDatum& v) { array_buffer_->push_back(v); }
251 
253  string_array_buffer_->emplace_back(std::vector<std::string>{});
254  return string_array_buffer_->back();
255  }
256 
258  string_array_buffer_->push_back(arr);
259  }
260 
261  void addDictEncodedString(const std::vector<std::string>& string_vec);
262 
264  const std::vector<OptionalStringVector>& string_array_vec) {
266 
267  // first check data is ok
268  for (auto& p : string_array_vec) {
269  if (!p) {
270  continue;
271  }
272  for (const auto& str : *p) {
273  if (str.size() > StringDictionary::MAX_STRLEN) {
274  throw std::runtime_error("String too long for dictionary encoding.");
275  }
276  }
277  }
278 
279  // to avoid copying, create a string view of each string in the
280  // `string_array_vec` where the array holding the string is *not null*
281  std::vector<std::vector<std::string_view>> string_view_array_vec;
282  for (auto& p : string_array_vec) {
283  if (!p) {
284  continue;
285  }
286  auto& array = string_view_array_vec.emplace_back();
287  for (const auto& str : *p) {
288  array.emplace_back(str);
289  }
290  }
291 
292  std::vector<std::vector<int32_t>> ids_array(0);
293  string_dict_->getOrAddBulkArray(string_view_array_vec, ids_array);
294 
295  size_t i, j;
296  for (i = 0, j = 0; i < string_array_vec.size(); ++i) {
297  if (!string_array_vec[i]) { // null array
298  string_array_dict_buffer_->push_back(
300  } else { // non-null array
301  auto& p = ids_array[j++];
302  size_t len = p.size() * sizeof(int32_t);
303  auto a = static_cast<int32_t*>(checked_malloc(len));
304  memcpy(a, &p[0], len);
305  string_array_dict_buffer_->push_back(
306  ArrayDatum(len, reinterpret_cast<int8_t*>(a), false));
307  }
308  }
309  }
310 
311  const SQLTypeInfo& getTypeInfo() const { return column_desc_->columnType; }
312 
313  const ColumnDescriptor* getColumnDesc() const { return column_desc_; }
314 
316 
317  int8_t* getAsBytes() const {
318  switch (column_desc_->columnType.get_type()) {
319  case kBOOLEAN:
320  return reinterpret_cast<int8_t*>(bool_buffer_->data());
321  case kTINYINT:
322  return reinterpret_cast<int8_t*>(tinyint_buffer_->data());
323  case kSMALLINT:
324  return reinterpret_cast<int8_t*>(smallint_buffer_->data());
325  case kINT:
326  return reinterpret_cast<int8_t*>(int_buffer_->data());
327  case kBIGINT:
328  case kNUMERIC:
329  case kDECIMAL:
330  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
331  case kFLOAT:
332  return reinterpret_cast<int8_t*>(float_buffer_->data());
333  case kDOUBLE:
334  return reinterpret_cast<int8_t*>(double_buffer_->data());
335  case kDATE:
336  case kTIME:
337  case kTIMESTAMP:
338  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
339  default:
340  abort();
341  }
342  }
343 
344  size_t getElementSize() const {
345  switch (column_desc_->columnType.get_type()) {
346  case kBOOLEAN:
347  return sizeof((*bool_buffer_)[0]);
348  case kTINYINT:
349  return sizeof((*tinyint_buffer_)[0]);
350  case kSMALLINT:
351  return sizeof((*smallint_buffer_)[0]);
352  case kINT:
353  return sizeof((*int_buffer_)[0]);
354  case kBIGINT:
355  case kNUMERIC:
356  case kDECIMAL:
357  return sizeof((*bigint_buffer_)[0]);
358  case kFLOAT:
359  return sizeof((*float_buffer_)[0]);
360  case kDOUBLE:
361  return sizeof((*double_buffer_)[0]);
362  case kDATE:
363  case kTIME:
364  case kTIMESTAMP:
365  return sizeof((*bigint_buffer_)[0]);
366  default:
367  abort();
368  }
369  }
370 
371  std::vector<std::string>* getStringBuffer() const { return string_buffer_; }
372 
373  std::vector<std::string>* getGeoStringBuffer() const { return geo_string_buffer_; }
374 
375  std::vector<ArrayDatum>* getArrayBuffer() const { return array_buffer_; }
376 
377  std::vector<OptionalStringVector>* getStringArrayBuffer() const {
378  return string_array_buffer_;
379  }
380 
381  std::vector<ArrayDatum>* getStringArrayDictBuffer() const {
383  }
384 
385  int8_t* getStringDictBuffer() const {
386  switch (column_desc_->columnType.get_size()) {
387  case 1:
388  return reinterpret_cast<int8_t*>(string_dict_i8_buffer_->data());
389  case 2:
390  return reinterpret_cast<int8_t*>(string_dict_i16_buffer_->data());
391  case 4:
392  return reinterpret_cast<int8_t*>(string_dict_i32_buffer_->data());
393  default:
394  abort();
395  }
396  }
397 
399  if (string_dict_ == nullptr) {
400  return true;
401  }
402  return string_dict_->checkpoint();
403  }
404 
405  void clear() {
406  switch (column_desc_->columnType.get_type()) {
407  case kBOOLEAN: {
408  bool_buffer_->clear();
409  break;
410  }
411  case kTINYINT: {
412  tinyint_buffer_->clear();
413  break;
414  }
415  case kSMALLINT: {
416  smallint_buffer_->clear();
417  break;
418  }
419  case kINT: {
420  int_buffer_->clear();
421  break;
422  }
423  case kBIGINT:
424  case kNUMERIC:
425  case kDECIMAL: {
426  bigint_buffer_->clear();
427  break;
428  }
429  case kFLOAT: {
430  float_buffer_->clear();
431  break;
432  }
433  case kDOUBLE: {
434  double_buffer_->clear();
435  break;
436  }
437  case kTEXT:
438  case kVARCHAR:
439  case kCHAR: {
440  string_buffer_->clear();
442  switch (column_desc_->columnType.get_size()) {
443  case 1:
444  string_dict_i8_buffer_->clear();
445  break;
446  case 2:
447  string_dict_i16_buffer_->clear();
448  break;
449  case 4:
450  string_dict_i32_buffer_->clear();
451  break;
452  default:
453  CHECK(false);
454  }
455  }
456  break;
457  }
458  case kDATE:
459  case kTIME:
460  case kTIMESTAMP:
461  bigint_buffer_->clear();
462  break;
463  case kARRAY: {
465  string_array_buffer_->clear();
466  string_array_dict_buffer_->clear();
467  } else {
468  array_buffer_->clear();
469  }
470  break;
471  }
472  case kPOINT:
473  case kLINESTRING:
474  case kPOLYGON:
475  case kMULTIPOLYGON:
476  geo_string_buffer_->clear();
477  break;
478  default:
479  CHECK(false);
480  }
481  }
482 
483  size_t add_values(const ColumnDescriptor* cd, const TColumn& data);
484 
485  size_t add_arrow_values(const ColumnDescriptor* cd,
486  const arrow::Array& data,
487  const bool exact_type_match,
488  const ArraySliceRange& slice_range,
489  BadRowsTracker* bad_rows_tracker);
490 
491  void add_value(const ColumnDescriptor* cd,
492  const std::string_view val,
493  const bool is_null,
494  const CopyParams& copy_params);
495 
496  void add_value(const ColumnDescriptor* cd, const TDatum& val, const bool is_null);
497 
498  void addDefaultValues(const ColumnDescriptor* cd, size_t num_rows);
499 
500  void pop_value();
501 
502  template <typename DATA_TYPE>
504  const arrow::Array& array,
505  std::vector<DATA_TYPE>& buffer,
506  const ArraySliceRange& slice_range,
507  BadRowsTracker* const bad_rows_tracker);
508  template <typename DATA_TYPE>
509  auto del_values(std::vector<DATA_TYPE>& buffer, BadRowsTracker* const bad_rows_tracker);
510  auto del_values(const SQLTypes type, BadRowsTracker* const bad_rows_tracker);
511 
512  static std::vector<DataBlockPtr> get_data_block_pointers(
513  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
514 
515  std::vector<std::unique_ptr<TypedImportBuffer>>* import_buffers;
516  size_t col_idx;
517 
518  private:
519  union {
520  std::vector<int8_t>* bool_buffer_;
521  std::vector<int8_t>* tinyint_buffer_;
522  std::vector<int16_t>* smallint_buffer_;
523  std::vector<int32_t>* int_buffer_;
524  std::vector<int64_t>* bigint_buffer_;
525  std::vector<float>* float_buffer_;
526  std::vector<double>* double_buffer_;
527  std::vector<std::string>* string_buffer_;
528  std::vector<std::string>* geo_string_buffer_;
529  std::vector<ArrayDatum>* array_buffer_;
530  std::vector<OptionalStringVector>* string_array_buffer_;
531  };
532  union {
533  std::vector<uint8_t>* string_dict_i8_buffer_;
534  std::vector<uint16_t>* string_dict_i16_buffer_;
535  std::vector<int32_t>* string_dict_i32_buffer_;
536  std::vector<ArrayDatum>* string_array_dict_buffer_;
537  };
540 };
541 
542 class Loader {
543  using LoadCallbackType =
544  std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
545  std::vector<DataBlockPtr>&,
546  size_t)>;
547 
548  public:
549  // ParquetDataWrapper
551  const TableDescriptor* t,
552  LoadCallbackType load_callback = nullptr)
553  : catalog_(c)
554  , table_desc_(t)
555  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
556  , load_callback_(load_callback) {
557  init();
558  }
559 
560  virtual ~Loader() {}
561 
563  const TableDescriptor* getTableDesc() const { return table_desc_; }
564  const std::list<const ColumnDescriptor*>& get_column_descs() const {
565  return column_descs_;
566  }
567 
569  if ((cd->columnType.get_type() != kARRAY ||
570  !IS_STRING(cd->columnType.get_subtype())) &&
571  (!cd->columnType.is_string() ||
573  return nullptr;
574  }
575  return dict_map_.at(cd->columnId);
576  }
577 
578  virtual bool load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
579  const size_t row_count,
580  const Catalog_Namespace::SessionInfo* session_info);
581  virtual bool loadNoCheckpoint(
582  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
583  const size_t row_count,
584  const Catalog_Namespace::SessionInfo* session_info);
585  virtual void checkpoint();
586  virtual std::vector<Catalog_Namespace::TableEpochInfo> getTableEpochs() const;
587  virtual void setTableEpochs(
588  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
589 
590  void setAddingColumns(const bool adding_columns) { adding_columns_ = adding_columns; }
591  bool isAddingColumns() const { return adding_columns_; }
592  void dropColumns(const std::vector<int>& columns);
593  std::string getErrorMessage() { return error_msg_; };
594 
595  protected:
596  void init();
597 
598  virtual bool loadImpl(
599  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
600  size_t row_count,
601  bool checkpoint,
602  const Catalog_Namespace::SessionInfo* session_info);
603 
604  using OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>;
605  void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
606  std::vector<size_t>& all_shard_row_counts,
607  const OneShardBuffers& import_buffers,
608  const size_t row_count,
609  const size_t shard_count,
610  const Catalog_Namespace::SessionInfo* session_info);
611 
614  std::list<const ColumnDescriptor*> column_descs_;
617  std::map<int, StringDictionary*> dict_map_;
618 
619  private:
620  bool loadToShard(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
621  size_t row_count,
622  const TableDescriptor* shard_table,
623  bool checkpoint,
624  const Catalog_Namespace::SessionInfo* session_info);
626  std::vector<OneShardBuffers>& all_shard_import_buffers,
627  std::vector<size_t>& all_shard_row_counts,
628  const OneShardBuffers& import_buffers,
629  const size_t row_count,
630  const size_t shard_count,
631  const Catalog_Namespace::SessionInfo* session_info);
633  std::vector<OneShardBuffers>& all_shard_import_buffers,
634  std::vector<size_t>& all_shard_row_counts,
635  const OneShardBuffers& import_buffers,
636  const size_t row_count,
637  const size_t shard_count,
638  const Catalog_Namespace::SessionInfo* session_info);
639  void fillShardRow(const size_t row_index,
640  OneShardBuffers& shard_output_buffers,
641  const OneShardBuffers& import_buffers);
642 
643  bool adding_columns_ = false;
644  std::mutex loader_mutex_;
645  std::string error_msg_;
646 };
647 
648 struct ImportStatus {
649  std::chrono::steady_clock::time_point start;
650  std::chrono::steady_clock::time_point end;
654  std::chrono::duration<size_t, std::milli> elapsed;
655  bool load_failed = false;
656  std::string load_msg;
657  int thread_id; // to recall thread_id after thread exit
659  : start(std::chrono::steady_clock::now())
660  , rows_completed(0)
661  , rows_estimated(0)
662  , rows_rejected(0)
663  , elapsed(0)
664  , thread_id(0) {}
665 
669  if (is.load_failed) {
670  load_failed = true;
671  load_msg = is.load_msg;
672  }
673 
674  return *this;
675  }
676 };
677 
679  public:
681  DataStreamSink(const CopyParams& copy_params, const std::string file_path)
682  : copy_params(copy_params), file_path(file_path) {}
683  virtual ~DataStreamSink() {}
685  const std::string& file_path,
686  const bool decompressed,
687  const Catalog_Namespace::SessionInfo* session_info) = 0;
688 #ifdef ENABLE_IMPORT_PARQUET
689  virtual void import_parquet(std::vector<std::string>& file_paths,
690  const Catalog_Namespace::SessionInfo* session_info);
691  virtual void import_local_parquet(
692  const std::string& file_path,
693  const Catalog_Namespace::SessionInfo* session_info) = 0;
694 #endif
695  const CopyParams& get_copy_params() const { return copy_params; }
696  void import_compressed(std::vector<std::string>& file_paths,
697  const Catalog_Namespace::SessionInfo* session_info);
698 
699  protected:
701 
703  const std::string file_path;
704  FILE* p_file = nullptr;
707  size_t total_file_size{0};
708  std::vector<size_t> file_offsets;
709  std::mutex file_offsets_mutex;
710 };
711 
712 class Detector : public DataStreamSink {
713  public:
714  Detector(const boost::filesystem::path& fp, CopyParams& cp)
715  : DataStreamSink(cp, fp.string()), file_path(fp) {
716  read_file();
717  init();
718  };
719 #ifdef ENABLE_IMPORT_PARQUET
720  void import_local_parquet(const std::string& file_path,
721  const Catalog_Namespace::SessionInfo* session_info) override;
722 #endif
723  static SQLTypes detect_sqltype(const std::string& str);
724  std::vector<std::string> get_headers();
725  std::vector<std::vector<std::string>> raw_rows;
726  std::vector<std::vector<std::string>> get_sample_rows(size_t n);
727  std::vector<SQLTypes> best_sqltypes;
728  std::vector<EncodingType> best_encodings;
729  bool has_headers = false;
730 
731  private:
732  void init();
733  void read_file();
734  void detect_row_delimiter();
735  void split_raw_data();
736  std::vector<SQLTypes> detect_column_types(const std::vector<std::string>& row);
737  static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b);
738  void find_best_sqltypes();
739  std::vector<SQLTypes> find_best_sqltypes(
740  const std::vector<std::vector<std::string>>& raw_rows,
741  const CopyParams& copy_params);
742  std::vector<SQLTypes> find_best_sqltypes(
743  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
744  const std::vector<std::vector<std::string>>::const_iterator& row_end,
745  const CopyParams& copy_params);
746 
747  std::vector<EncodingType> find_best_encodings(
748  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
749  const std::vector<std::vector<std::string>>::const_iterator& row_end,
750  const std::vector<SQLTypes>& best_types);
751 
752  bool detect_headers(const std::vector<SQLTypes>& first_types,
753  const std::vector<SQLTypes>& rest_types);
756  const std::string& file_path,
757  const bool decompressed,
758  const Catalog_Namespace::SessionInfo* session_info) override;
759  std::string raw_data;
760  boost::filesystem::path file_path;
761  std::chrono::duration<double> timeout{1};
762  std::string line1;
763 };
764 
765 class Importer : public DataStreamSink, public AbstractImporter {
766  public:
768  const TableDescriptor* t,
769  const std::string& f,
770  const CopyParams& p);
771  Importer(Loader* providedLoader, const std::string& f, const CopyParams& p);
772  ~Importer() override;
773  ImportStatus import(const Catalog_Namespace::SessionInfo* session_info) override;
775  const std::string& file_path,
776  const bool decompressed,
777  const Catalog_Namespace::SessionInfo* session_info) override;
778  ImportStatus importGDAL(std::map<std::string, std::string> colname_to_src,
779  const Catalog_Namespace::SessionInfo* session_info);
780  const CopyParams& get_copy_params() const { return copy_params; }
781  const std::list<const ColumnDescriptor*>& get_column_descs() const {
782  return loader->get_column_descs();
783  }
784  void load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
785  size_t row_count,
786  const Catalog_Namespace::SessionInfo* session_info);
787  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>>& get_import_buffers_vec() {
788  return import_buffers_vec;
789  }
790  std::vector<std::unique_ptr<TypedImportBuffer>>& get_import_buffers(int i) {
791  return import_buffers_vec[i];
792  }
793  const bool* get_is_array() const { return is_array_a.get(); }
794 #ifdef ENABLE_IMPORT_PARQUET
795  void import_local_parquet(const std::string& file_path,
796  const Catalog_Namespace::SessionInfo* session_info) override;
797 #endif
798  static ImportStatus get_import_status(const std::string& id);
799  static void set_import_status(const std::string& id, const ImportStatus is);
800  static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
801  const std::string& fileName,
802  const std::string& geoColumnName,
803  const CopyParams& copy_params);
804  static void readMetadataSampleGDAL(
805  const std::string& fileName,
806  const std::string& geoColumnName,
807  std::map<std::string, std::vector<std::string>>& metadata,
808  int rowLimit,
809  const CopyParams& copy_params);
810  static bool gdalFileExists(const std::string& path, const CopyParams& copy_params);
811  static bool gdalFileOrDirectoryExists(const std::string& path,
812  const CopyParams& copy_params);
813  static std::vector<std::string> gdalGetAllFilesInArchive(
814  const std::string& archive_path,
815  const CopyParams& copy_params);
818  GeoFileLayerInfo(const std::string& name_, GeoFileLayerContents contents_)
819  : name(name_), contents(contents_) {}
820  std::string name;
822  };
823  static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
824  const std::string& file_name,
825  const CopyParams& copy_params);
826  Catalog_Namespace::Catalog& getCatalog() { return loader->getCatalog(); }
827  static void set_geo_physical_import_buffer(
828  const Catalog_Namespace::Catalog& catalog,
829  const ColumnDescriptor* cd,
830  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
831  size_t& col_idx,
832  std::vector<double>& coords,
833  std::vector<double>& bounds,
834  std::vector<int>& ring_sizes,
835  std::vector<int>& poly_rings,
836  int render_group);
838  const Catalog_Namespace::Catalog& catalog,
839  const ColumnDescriptor* cd,
840  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
841  size_t& col_idx,
842  std::vector<std::vector<double>>& coords_column,
843  std::vector<std::vector<double>>& bounds_column,
844  std::vector<std::vector<int>>& ring_sizes_column,
845  std::vector<std::vector<int>>& poly_rings_column,
846  std::vector<int>& render_groups_column);
847  void checkpoint(const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
848  auto getLoader() const { return loader.get(); }
849 
850  private:
851  static bool gdalStatInternal(const std::string& path,
852  const CopyParams& copy_params,
853  bool also_dir);
854  static OGRDataSource* openGDALDataset(const std::string& fileName,
855  const CopyParams& copy_params);
857  std::string import_id;
858  size_t file_size;
859  size_t max_threads;
860  char* buffer[2];
861  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>> import_buffers_vec;
862  std::unique_ptr<Loader> loader;
863  std::unique_ptr<bool[]> is_array_a;
864  static std::mutex init_gdal_mutex;
865 };
866 
867 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
868  const TableDescriptor* td,
869  Loader* loader);
870 
871 std::vector<std::unique_ptr<TypedImportBuffer>> fill_missing_columns(
873  Fragmenter_Namespace::InsertData& insert_data);
874 
875 } // namespace import_export
876 
877 #endif // _IMPORTER_H_
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:73
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
Definition: Importer.h:550
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:564
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5127
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
Definition: Importer.cpp:4564
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:4335
std::mutex loader_mutex_
Definition: Importer.h:644
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:311
StringDictionary * getStringDictionary() const
Definition: Importer.h:315
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:3182
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
void addBigint(const int64_t v)
Definition: Importer.h:240
std::string cat(Ts &&...args)
OptionalStringVector & addStringArray()
Definition: Importer.h:252
void addSmallint(const int16_t v)
Definition: Importer.h:236
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
Definition: sqltypes.h:49
SQLTypes
Definition: sqltypes.h:38
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
Definition: Importer.h:94
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:4066
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params)
Definition: Importer.cpp:541
const TableDescriptor * getTableDesc() const
Definition: Importer.h:563
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:3151
std::vector< std::string > * string_buffer_
Definition: Importer.h:527
void addString(const std::string_view v)
Definition: Importer.h:246
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:529
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3423
StringDictionary * string_dict_
Definition: Importer.h:539
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3570
std::atomic< int > nerrors
Definition: Importer.h:78
std::optional< std::vector< std::string >> OptionalStringVector
Definition: Importer.h:93
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:410
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
Definition: Importer.cpp:1730
void addDouble(const double v)
Definition: Importer.h:244
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5409
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:170
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:522
const bool * get_is_array() const
Definition: Importer.h:793
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
Definition: Importer.h:381
const TableDescriptor * table_desc_
Definition: Importer.h:613
virtual void checkpoint()
Definition: Importer.cpp:4556
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5394
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
Definition: Importer.h:787
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:727
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:5060
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:654
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2968
const CopyParams & get_copy_params() const
Definition: Importer.h:695
std::vector< float > * float_buffer_
Definition: Importer.h:525
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:790
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4925
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
Definition: Importer.h:818
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4958
std::vector< double > * double_buffer_
Definition: Importer.h:526
void addFloat(const float v)
Definition: Importer.h:242
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:616
std::vector< std::string > * getStringBuffer() const
Definition: Importer.h:371
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:985
mapd_shared_mutex import_mutex_
Definition: Importer.h:706
DataStreamSink(const CopyParams &copy_params, const std::string file_path)
Definition: Importer.h:681
ImportStatus & operator+=(const ImportStatus &is)
Definition: Importer.h:666
constexpr double a
Definition: Utm.h:38
void addStringArray(const OptionalStringVector &arr)
Definition: Importer.h:257
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
Definition: Importer.h:248
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:3311
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3499
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void setAddingColumns(const bool adding_columns)
Definition: Importer.h:590
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:604
std::vector< int32_t > * int_buffer_
Definition: Importer.h:523
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4575
std::vector< ArrayDatum > * string_array_dict_buffer_
Definition: Importer.h:536
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3033
CONSTEXPR DEVICE bool is_null(const T &value)
DEVICE Array(const int64_t size, const bool is_null=false)
Definition: OmniSciTypes.h:71
void addBoolean(const int8_t v)
Definition: Importer.h:232
auto getLoader() const
Definition: Importer.h:848
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:533
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4835
void addDictEncodedStringArray(const std::vector< OptionalStringVector > &string_array_vec)
Definition: Importer.h:263
void addTinyint(const int8_t v)
Definition: Importer.h:234
std::shared_timed_mutex mapd_shared_mutex
std::vector< OptionalStringVector > * string_array_buffer_
Definition: Importer.h:530
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4701
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:524
int8_t * getAsBytes() const
Definition: Importer.h:317
std::string error_msg_
Definition: Importer.h:645
void addInt(const int32_t v)
Definition: Importer.h:238
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
Definition: Importer.cpp:1630
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:826
std::vector< std::vector< std::string > > raw_rows
Definition: Importer.h:725
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2782
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2837
bool isAddingColumns() const
Definition: Importer.h:591
std::vector< EncodingType > best_encodings
Definition: Importer.h:728
std::vector< int8_t > * bool_buffer_
Definition: Importer.h:520
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861
bool g_enable_smem_group_by true
boost::filesystem::path file_path
Definition: Importer.h:760
size_t getElementSize() const
Definition: Importer.h:344
int8_t * getStringDictBuffer() const
Definition: Importer.h:385
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4963
std::set< int64_t > rows
Definition: Importer.h:77
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:863
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Definition: Importer.h:515
bool checkpoint() noexcept
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:234
Definition: sqltypes.h:52
Definition: sqltypes.h:53
Detector(const boost::filesystem::path &fp, CopyParams &cp)
Definition: Importer.h:714
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2775
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
std::vector< int32_t > * string_dict_i32_buffer_
Definition: Importer.h:535
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3401
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:614
static ArrayDatum composeNullPointCoords(const SQLTypeInfo &coords_ti, const SQLTypeInfo &geo_ti)
Definition: Importer.cpp:414
void addArray(const ArrayDatum &v)
Definition: Importer.h:250
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3094
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:562
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3616
std::string getErrorMessage()
Definition: Importer.h:593
std::chrono::duration< double > timeout
Definition: Importer.h:761
Definition: sqltypes.h:41
std::vector< std::string > * getGeoStringBuffer() const
Definition: Importer.h:373
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3550
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3535
#define IS_STRING(T)
Definition: sqltypes.h:250
std::string import_id
Definition: Importer.h:857
const ColumnDescriptor * column_desc_
Definition: Importer.h:538
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:884
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3580
std::chrono::steady_clock::time_point start
Definition: Importer.h:649
std::vector< uint16_t > * string_dict_i16_buffer_
Definition: Importer.h:534
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3001
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3393
std::vector< std::string > get_headers()
Definition: Importer.cpp:3558
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612
const CopyParams & get_copy_params() const
Definition: Importer.h:780
bool g_enable_watchdog false
Definition: Execute.cpp:76
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:521
#define CHECK(condition)
Definition: Logger.h:209
std::string raw_data
Definition: Importer.h:759
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:229
char * t
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
const ColumnDescriptor * getColumnDesc() const
Definition: Importer.h:313
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:568
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2909
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:5035
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
char * f
static constexpr size_t MAX_STRLEN
void addDefaultValues(const ColumnDescriptor *cd, size_t num_rows)
Definition: Importer.cpp:1451
Definition: sqltypes.h:45
SQLTypeInfo columnType
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4645
std::vector< OptionalStringVector > * getStringArrayBuffer() const
Definition: Importer.h:377
bool is_string() const
Definition: sqltypes.h:509
LoadCallbackType load_callback_
Definition: Importer.h:615
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2947
constexpr double n
Definition: Utm.h:46
static std::mutex init_gdal_mutex
Definition: Importer.h:864
std::vector< size_t > file_offsets
Definition: Importer.h:708
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:617
std::vector< std::string > * geo_string_buffer_
Definition: Importer.h:528
std::chrono::steady_clock::time_point end
Definition: Importer.h:650
std::vector< ArrayDatum > * getArrayBuffer() const
Definition: Importer.h:375
std::unique_ptr< Loader > loader
Definition: Importer.h:862
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
Definition: Importer.h:546
void addDictEncodedString(const std::vector< std::string > &string_vec)
Definition: Importer.cpp:501
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:781
const std::string file_path
Definition: Importer.h:703
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:4569