OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.h
19  * @author Wei Hong < wei@mapd.com>
20  * @brief Importer class for table import from file
21  */
22 #ifndef _IMPORTER_H_
23 #define _IMPORTER_H_
24 
25 #include <gdal.h>
26 #include <ogrsf_frmts.h>
27 
28 #include <atomic>
29 #include <boost/filesystem.hpp>
30 #include <boost/noncopyable.hpp>
31 #include <boost/tokenizer.hpp>
32 #include <condition_variable>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <iostream>
36 #include <list>
37 #include <map>
38 #include <memory>
39 #include <mutex>
40 #include <set>
41 #include <string>
42 #include <string_view>
43 #include <utility>
44 
45 #include "Catalog/Catalog.h"
47 #include "DataMgr/Chunk/Chunk.h"
48 #include "Fragmenter/Fragmenter.h"
50 #include "Logger/Logger.h"
52 #include "Shared/checked_alloc.h"
53 #include "Shared/fixautotools.h"
54 
55 // Some builds of boost::geometry require iostream, but don't explicitly include it.
56 // Placing in own section to ensure it's included after iostream.
57 #include <boost/geometry/index/rtree.hpp>
58 
59 class TDatum;
60 class TColumn;
61 
62 namespace arrow {
63 
64 class Array;
65 
66 } // namespace arrow
67 
68 namespace import_export {
69 
70 class Importer;
71 
72 using ArraySliceRange = std::pair<size_t, size_t>;
73 
75  std::mutex mutex;
76  std::set<int64_t> rows;
77  std::atomic<int> nerrors;
78  std::string file_name;
79  int row_group;
81 };
82 
83 class TypedImportBuffer : boost::noncopyable {
84  public:
85  TypedImportBuffer(const ColumnDescriptor* col_desc, StringDictionary* string_dict)
86  : column_desc_(col_desc), string_dict_(string_dict) {
87  switch (col_desc->columnType.get_type()) {
88  case kBOOLEAN:
89  bool_buffer_ = new std::vector<int8_t>();
90  break;
91  case kTINYINT:
92  tinyint_buffer_ = new std::vector<int8_t>();
93  break;
94  case kSMALLINT:
95  smallint_buffer_ = new std::vector<int16_t>();
96  break;
97  case kINT:
98  int_buffer_ = new std::vector<int32_t>();
99  break;
100  case kBIGINT:
101  case kNUMERIC:
102  case kDECIMAL:
103  bigint_buffer_ = new std::vector<int64_t>();
104  break;
105  case kFLOAT:
106  float_buffer_ = new std::vector<float>();
107  break;
108  case kDOUBLE:
109  double_buffer_ = new std::vector<double>();
110  break;
111  case kTEXT:
112  case kVARCHAR:
113  case kCHAR:
114  string_buffer_ = new std::vector<std::string>();
115  if (col_desc->columnType.get_compression() == kENCODING_DICT) {
116  switch (col_desc->columnType.get_size()) {
117  case 1:
118  string_dict_i8_buffer_ = new std::vector<uint8_t>();
119  break;
120  case 2:
121  string_dict_i16_buffer_ = new std::vector<uint16_t>();
122  break;
123  case 4:
124  string_dict_i32_buffer_ = new std::vector<int32_t>();
125  break;
126  default:
127  CHECK(false);
128  }
129  }
130  break;
131  case kDATE:
132  case kTIME:
133  case kTIMESTAMP:
134  bigint_buffer_ = new std::vector<int64_t>();
135  break;
136  case kARRAY:
137  if (IS_STRING(col_desc->columnType.get_subtype())) {
139  string_array_buffer_ = new std::vector<std::vector<std::string>>();
140  string_array_dict_buffer_ = new std::vector<ArrayDatum>();
141  } else {
142  array_buffer_ = new std::vector<ArrayDatum>();
143  }
144  break;
145  case kPOINT:
146  case kLINESTRING:
147  case kPOLYGON:
148  case kMULTIPOLYGON:
149  geo_string_buffer_ = new std::vector<std::string>();
150  break;
151  default:
152  CHECK(false);
153  }
154  }
155 
157  switch (column_desc_->columnType.get_type()) {
158  case kBOOLEAN:
159  delete bool_buffer_;
160  break;
161  case kTINYINT:
162  delete tinyint_buffer_;
163  break;
164  case kSMALLINT:
165  delete smallint_buffer_;
166  break;
167  case kINT:
168  delete int_buffer_;
169  break;
170  case kBIGINT:
171  case kNUMERIC:
172  case kDECIMAL:
173  delete bigint_buffer_;
174  break;
175  case kFLOAT:
176  delete float_buffer_;
177  break;
178  case kDOUBLE:
179  delete double_buffer_;
180  break;
181  case kTEXT:
182  case kVARCHAR:
183  case kCHAR:
184  delete string_buffer_;
186  switch (column_desc_->columnType.get_size()) {
187  case 1:
188  delete string_dict_i8_buffer_;
189  break;
190  case 2:
192  break;
193  case 4:
195  break;
196  }
197  }
198  break;
199  case kDATE:
200  case kTIME:
201  case kTIMESTAMP:
202  delete bigint_buffer_;
203  break;
204  case kARRAY:
206  delete string_array_buffer_;
208  } else {
209  delete array_buffer_;
210  }
211  break;
212  case kPOINT:
213  case kLINESTRING:
214  case kPOLYGON:
215  case kMULTIPOLYGON:
216  delete geo_string_buffer_;
217  break;
218  default:
219  CHECK(false);
220  }
221  }
222 
223  void addBoolean(const int8_t v) { bool_buffer_->push_back(v); }
224 
225  void addTinyint(const int8_t v) { tinyint_buffer_->push_back(v); }
226 
227  void addSmallint(const int16_t v) { smallint_buffer_->push_back(v); }
228 
229  void addInt(const int32_t v) { int_buffer_->push_back(v); }
230 
231  void addBigint(const int64_t v) { bigint_buffer_->push_back(v); }
232 
233  void addFloat(const float v) { float_buffer_->push_back(v); }
234 
235  void addDouble(const double v) { double_buffer_->push_back(v); }
236 
237  void addString(const std::string_view v) { string_buffer_->emplace_back(v); }
238 
239  void addGeoString(const std::string_view v) { geo_string_buffer_->emplace_back(v); }
240 
241  void addArray(const ArrayDatum& v) { array_buffer_->push_back(v); }
242 
243  std::vector<std::string>& addStringArray() {
244  string_array_buffer_->emplace_back();
245  return string_array_buffer_->back();
246  }
247 
248  void addStringArray(const std::vector<std::string>& arr) {
249  string_array_buffer_->push_back(arr);
250  }
251 
252  void addDictEncodedString(const std::vector<std::string>& string_vec);
253 
255  const std::vector<std::vector<std::string>>& string_array_vec) {
257 
258  // first check data is ok
259  for (auto& p : string_array_vec) {
260  for (const auto& str : p) {
261  if (str.size() > StringDictionary::MAX_STRLEN) {
262  throw std::runtime_error("String too long for dictionary encoding.");
263  }
264  }
265  }
266 
267  std::vector<std::vector<int32_t>> ids_array(0);
268  string_dict_->getOrAddBulkArray(string_array_vec, ids_array);
269 
270  for (auto& p : ids_array) {
271  size_t len = p.size() * sizeof(int32_t);
272  auto a = static_cast<int32_t*>(checked_malloc(len));
273  memcpy(a, &p[0], len);
274  // TODO: distinguish between empty and NULL
275  string_array_dict_buffer_->push_back(
276  ArrayDatum(len, reinterpret_cast<int8_t*>(a), len == 0));
277  }
278  }
279 
280  const SQLTypeInfo& getTypeInfo() const { return column_desc_->columnType; }
281 
282  const ColumnDescriptor* getColumnDesc() const { return column_desc_; }
283 
285 
286  int8_t* getAsBytes() const {
287  switch (column_desc_->columnType.get_type()) {
288  case kBOOLEAN:
289  return reinterpret_cast<int8_t*>(bool_buffer_->data());
290  case kTINYINT:
291  return reinterpret_cast<int8_t*>(tinyint_buffer_->data());
292  case kSMALLINT:
293  return reinterpret_cast<int8_t*>(smallint_buffer_->data());
294  case kINT:
295  return reinterpret_cast<int8_t*>(int_buffer_->data());
296  case kBIGINT:
297  case kNUMERIC:
298  case kDECIMAL:
299  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
300  case kFLOAT:
301  return reinterpret_cast<int8_t*>(float_buffer_->data());
302  case kDOUBLE:
303  return reinterpret_cast<int8_t*>(double_buffer_->data());
304  case kDATE:
305  case kTIME:
306  case kTIMESTAMP:
307  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
308  default:
309  abort();
310  }
311  }
312 
313  size_t getElementSize() const {
314  switch (column_desc_->columnType.get_type()) {
315  case kBOOLEAN:
316  return sizeof((*bool_buffer_)[0]);
317  case kTINYINT:
318  return sizeof((*tinyint_buffer_)[0]);
319  case kSMALLINT:
320  return sizeof((*smallint_buffer_)[0]);
321  case kINT:
322  return sizeof((*int_buffer_)[0]);
323  case kBIGINT:
324  case kNUMERIC:
325  case kDECIMAL:
326  return sizeof((*bigint_buffer_)[0]);
327  case kFLOAT:
328  return sizeof((*float_buffer_)[0]);
329  case kDOUBLE:
330  return sizeof((*double_buffer_)[0]);
331  case kDATE:
332  case kTIME:
333  case kTIMESTAMP:
334  return sizeof((*bigint_buffer_)[0]);
335  default:
336  abort();
337  }
338  }
339 
340  std::vector<std::string>* getStringBuffer() const { return string_buffer_; }
341 
342  std::vector<std::string>* getGeoStringBuffer() const { return geo_string_buffer_; }
343 
344  std::vector<ArrayDatum>* getArrayBuffer() const { return array_buffer_; }
345 
346  std::vector<std::vector<std::string>>* getStringArrayBuffer() const {
347  return string_array_buffer_;
348  }
349 
350  std::vector<ArrayDatum>* getStringArrayDictBuffer() const {
352  }
353 
354  int8_t* getStringDictBuffer() const {
355  switch (column_desc_->columnType.get_size()) {
356  case 1:
357  return reinterpret_cast<int8_t*>(string_dict_i8_buffer_->data());
358  case 2:
359  return reinterpret_cast<int8_t*>(string_dict_i16_buffer_->data());
360  case 4:
361  return reinterpret_cast<int8_t*>(string_dict_i32_buffer_->data());
362  default:
363  abort();
364  }
365  }
366 
368  if (string_dict_ == nullptr) {
369  return true;
370  }
371  return string_dict_->checkpoint();
372  }
373 
374  void clear() {
375  switch (column_desc_->columnType.get_type()) {
376  case kBOOLEAN: {
377  bool_buffer_->clear();
378  break;
379  }
380  case kTINYINT: {
381  tinyint_buffer_->clear();
382  break;
383  }
384  case kSMALLINT: {
385  smallint_buffer_->clear();
386  break;
387  }
388  case kINT: {
389  int_buffer_->clear();
390  break;
391  }
392  case kBIGINT:
393  case kNUMERIC:
394  case kDECIMAL: {
395  bigint_buffer_->clear();
396  break;
397  }
398  case kFLOAT: {
399  float_buffer_->clear();
400  break;
401  }
402  case kDOUBLE: {
403  double_buffer_->clear();
404  break;
405  }
406  case kTEXT:
407  case kVARCHAR:
408  case kCHAR: {
409  string_buffer_->clear();
411  switch (column_desc_->columnType.get_size()) {
412  case 1:
413  string_dict_i8_buffer_->clear();
414  break;
415  case 2:
416  string_dict_i16_buffer_->clear();
417  break;
418  case 4:
419  string_dict_i32_buffer_->clear();
420  break;
421  default:
422  CHECK(false);
423  }
424  }
425  break;
426  }
427  case kDATE:
428  case kTIME:
429  case kTIMESTAMP:
430  bigint_buffer_->clear();
431  break;
432  case kARRAY: {
434  string_array_buffer_->clear();
435  string_array_dict_buffer_->clear();
436  } else {
437  array_buffer_->clear();
438  }
439  break;
440  }
441  case kPOINT:
442  case kLINESTRING:
443  case kPOLYGON:
444  case kMULTIPOLYGON:
445  geo_string_buffer_->clear();
446  break;
447  default:
448  CHECK(false);
449  }
450  }
451 
452  size_t add_values(const ColumnDescriptor* cd, const TColumn& data);
453 
454  size_t add_arrow_values(const ColumnDescriptor* cd,
455  const arrow::Array& data,
456  const bool exact_type_match,
457  const ArraySliceRange& slice_range,
458  BadRowsTracker* bad_rows_tracker);
459 
460  void add_value(const ColumnDescriptor* cd,
461  const std::string_view val,
462  const bool is_null,
463  const CopyParams& copy_params);
464 
465  void add_value(const ColumnDescriptor* cd, const TDatum& val, const bool is_null);
466 
467  void pop_value();
468 
469  template <typename DATA_TYPE>
471  const arrow::Array& array,
472  std::vector<DATA_TYPE>& buffer,
473  const ArraySliceRange& slice_range,
474  BadRowsTracker* const bad_rows_tracker);
475  template <typename DATA_TYPE>
476  auto del_values(std::vector<DATA_TYPE>& buffer, BadRowsTracker* const bad_rows_tracker);
477  auto del_values(const SQLTypes type, BadRowsTracker* const bad_rows_tracker);
478 
479  static std::vector<DataBlockPtr> get_data_block_pointers(
480  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
481 
482  std::vector<std::unique_ptr<TypedImportBuffer>>* import_buffers;
483  size_t col_idx;
484 
485  private:
486  union {
487  std::vector<int8_t>* bool_buffer_;
488  std::vector<int8_t>* tinyint_buffer_;
489  std::vector<int16_t>* smallint_buffer_;
490  std::vector<int32_t>* int_buffer_;
491  std::vector<int64_t>* bigint_buffer_;
492  std::vector<float>* float_buffer_;
493  std::vector<double>* double_buffer_;
494  std::vector<std::string>* string_buffer_;
495  std::vector<std::string>* geo_string_buffer_;
496  std::vector<ArrayDatum>* array_buffer_;
497  std::vector<std::vector<std::string>>* string_array_buffer_;
498  };
499  union {
500  std::vector<uint8_t>* string_dict_i8_buffer_;
501  std::vector<uint16_t>* string_dict_i16_buffer_;
502  std::vector<int32_t>* string_dict_i32_buffer_;
503  std::vector<ArrayDatum>* string_array_dict_buffer_;
504  };
507 };
508 
509 class Loader {
510  using LoadCallbackType =
511  std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
512  std::vector<DataBlockPtr>&,
513  size_t)>;
514 
515  public:
516  // TODO: Remove the `use_catalog_locks` parameter once Loader is refactored out of
517  // ParquetDataWrapper
519  const TableDescriptor* t,
520  LoadCallbackType load_callback = nullptr,
521  bool use_catalog_locks = true)
522  : catalog_(c)
523  , table_desc_(t)
524  , column_descs_(
525  use_catalog_locks
526  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
527  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
528  , load_callback_(load_callback) {
529  init(use_catalog_locks);
530  }
531 
532  virtual ~Loader() {}
533 
535  const TableDescriptor* getTableDesc() const { return table_desc_; }
536  const std::list<const ColumnDescriptor*>& get_column_descs() const {
537  return column_descs_;
538  }
539 
541  if ((cd->columnType.get_type() != kARRAY ||
542  !IS_STRING(cd->columnType.get_subtype())) &&
543  (!cd->columnType.is_string() ||
545  return nullptr;
546  }
547  return dict_map_.at(cd->columnId);
548  }
549 
550  virtual bool load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
551  const size_t row_count,
552  const Catalog_Namespace::SessionInfo* session_info);
553  virtual bool loadNoCheckpoint(
554  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
555  const size_t row_count,
556  const Catalog_Namespace::SessionInfo* session_info);
557  virtual void checkpoint();
558  virtual std::vector<Catalog_Namespace::TableEpochInfo> getTableEpochs() const;
559  virtual void setTableEpochs(
560  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
561 
562  void setAddingColumns(const bool adding_columns) { adding_columns_ = adding_columns; }
563  bool isAddingColumns() const { return adding_columns_; }
564  void dropColumns(const std::vector<int>& columns);
565  std::string getErrorMessage() { return error_msg_; };
566 
567  protected:
568  void init(const bool use_catalog_locks);
569 
570  virtual bool loadImpl(
571  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
572  size_t row_count,
573  bool checkpoint,
574  const Catalog_Namespace::SessionInfo* session_info);
575 
576  using OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>;
577  void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
578  std::vector<size_t>& all_shard_row_counts,
579  const OneShardBuffers& import_buffers,
580  const size_t row_count,
581  const size_t shard_count,
582  const Catalog_Namespace::SessionInfo* session_info);
583 
586  std::list<const ColumnDescriptor*> column_descs_;
589  std::map<int, StringDictionary*> dict_map_;
590 
591  private:
592  bool loadToShard(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
593  size_t row_count,
594  const TableDescriptor* shard_table,
595  bool checkpoint,
596  const Catalog_Namespace::SessionInfo* session_info);
598  std::vector<OneShardBuffers>& all_shard_import_buffers,
599  std::vector<size_t>& all_shard_row_counts,
600  const OneShardBuffers& import_buffers,
601  const size_t row_count,
602  const size_t shard_count,
603  const Catalog_Namespace::SessionInfo* session_info);
605  std::vector<OneShardBuffers>& all_shard_import_buffers,
606  std::vector<size_t>& all_shard_row_counts,
607  const OneShardBuffers& import_buffers,
608  const size_t row_count,
609  const size_t shard_count,
610  const Catalog_Namespace::SessionInfo* session_info);
611  void fillShardRow(const size_t row_index,
612  OneShardBuffers& shard_output_buffers,
613  const OneShardBuffers& import_buffers);
614 
615  bool adding_columns_ = false;
616  std::mutex loader_mutex_;
617  std::string error_msg_;
618 };
619 
620 struct ImportStatus {
621  std::chrono::steady_clock::time_point start;
622  std::chrono::steady_clock::time_point end;
626  std::chrono::duration<size_t, std::milli> elapsed;
627  bool load_failed = false;
628  std::string load_msg;
629  int thread_id; // to recall thread_id after thread exit
631  : start(std::chrono::steady_clock::now())
632  , rows_completed(0)
633  , rows_estimated(0)
634  , rows_rejected(0)
635  , elapsed(0)
636  , thread_id(0) {}
637 
641  if (is.load_failed) {
642  load_failed = true;
643  load_msg = is.load_msg;
644  }
645 
646  return *this;
647  }
648 };
649 
651  public:
653  DataStreamSink(const CopyParams& copy_params, const std::string file_path)
654  : copy_params(copy_params), file_path(file_path) {}
655  virtual ~DataStreamSink() {}
657  const std::string& file_path,
658  const bool decompressed,
659  const Catalog_Namespace::SessionInfo* session_info) = 0;
660 #ifdef ENABLE_IMPORT_PARQUET
661  virtual void import_parquet(std::vector<std::string>& file_paths,
662  const Catalog_Namespace::SessionInfo* session_info);
663  virtual void import_local_parquet(
664  const std::string& file_path,
665  const Catalog_Namespace::SessionInfo* session_info) = 0;
666 #endif
667  const CopyParams& get_copy_params() const { return copy_params; }
668  void import_compressed(std::vector<std::string>& file_paths,
669  const Catalog_Namespace::SessionInfo* session_info);
670 
671  protected:
673 
675  const std::string file_path;
676  FILE* p_file = nullptr;
679  size_t total_file_size{0};
680  std::vector<size_t> file_offsets;
681  std::mutex file_offsets_mutex;
682 };
683 
684 class Detector : public DataStreamSink {
685  public:
686  Detector(const boost::filesystem::path& fp, CopyParams& cp)
687  : DataStreamSink(cp, fp.string()), file_path(fp) {
688  read_file();
689  init();
690  };
691 #ifdef ENABLE_IMPORT_PARQUET
692  void import_local_parquet(const std::string& file_path,
693  const Catalog_Namespace::SessionInfo* session_info) override;
694 #endif
695  static SQLTypes detect_sqltype(const std::string& str);
696  std::vector<std::string> get_headers();
697  std::vector<std::vector<std::string>> raw_rows;
698  std::vector<std::vector<std::string>> get_sample_rows(size_t n);
699  std::vector<SQLTypes> best_sqltypes;
700  std::vector<EncodingType> best_encodings;
701  bool has_headers = false;
702 
703  private:
704  void init();
705  void read_file();
706  void detect_row_delimiter();
707  void split_raw_data();
708  std::vector<SQLTypes> detect_column_types(const std::vector<std::string>& row);
709  static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b);
710  void find_best_sqltypes();
711  std::vector<SQLTypes> find_best_sqltypes(
712  const std::vector<std::vector<std::string>>& raw_rows,
713  const CopyParams& copy_params);
714  std::vector<SQLTypes> find_best_sqltypes(
715  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
716  const std::vector<std::vector<std::string>>::const_iterator& row_end,
717  const CopyParams& copy_params);
718 
719  std::vector<EncodingType> find_best_encodings(
720  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
721  const std::vector<std::vector<std::string>>::const_iterator& row_end,
722  const std::vector<SQLTypes>& best_types);
723 
724  bool detect_headers(const std::vector<SQLTypes>& first_types,
725  const std::vector<SQLTypes>& rest_types);
728  const std::string& file_path,
729  const bool decompressed,
730  const Catalog_Namespace::SessionInfo* session_info) override;
731  std::string raw_data;
732  boost::filesystem::path file_path;
733  std::chrono::duration<double> timeout{1};
734  std::string line1;
735 };
736 
738  public:
739  static ArrayDatum composeNullArray(const SQLTypeInfo& ti);
740 };
741 
743  public:
744  RenderGroupAnalyzer() : _rtree(std::make_unique<RTree>()), _numRenderGroups(0) {}
746  const std::string& tableName,
747  const std::string& geoColumnBaseName);
748  int insertBoundsAndReturnRenderGroup(const std::vector<double>& bounds);
749 
750  private:
751  using Point = boost::geometry::model::point<double, 2, boost::geometry::cs::cartesian>;
752  using BoundingBox = boost::geometry::model::box<Point>;
753  using Node = std::pair<BoundingBox, int>;
754  using RTree =
755  boost::geometry::index::rtree<Node, boost::geometry::index::quadratic<16>>;
756  std::unique_ptr<RTree> _rtree;
757  std::mutex _rtreeMutex;
759 };
760 
761 class Importer : public DataStreamSink {
762  public:
764  const TableDescriptor* t,
765  const std::string& f,
766  const CopyParams& p);
767  Importer(Loader* providedLoader, const std::string& f, const CopyParams& p);
768  ~Importer() override;
769  ImportStatus import(const Catalog_Namespace::SessionInfo* session_info);
771  const std::string& file_path,
772  const bool decompressed,
773  const Catalog_Namespace::SessionInfo* session_info) override;
774  ImportStatus importGDAL(std::map<std::string, std::string> colname_to_src,
775  const Catalog_Namespace::SessionInfo* session_info);
776  const CopyParams& get_copy_params() const { return copy_params; }
777  const std::list<const ColumnDescriptor*>& get_column_descs() const {
778  return loader->get_column_descs();
779  }
780  void load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
781  size_t row_count,
782  const Catalog_Namespace::SessionInfo* session_info);
783  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>>& get_import_buffers_vec() {
784  return import_buffers_vec;
785  }
786  std::vector<std::unique_ptr<TypedImportBuffer>>& get_import_buffers(int i) {
787  return import_buffers_vec[i];
788  }
789  const bool* get_is_array() const { return is_array_a.get(); }
790 #ifdef ENABLE_IMPORT_PARQUET
791  void import_local_parquet(const std::string& file_path,
792  const Catalog_Namespace::SessionInfo* session_info) override;
793 #endif
794  static ImportStatus get_import_status(const std::string& id);
795  static void set_import_status(const std::string& id, const ImportStatus is);
796  static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
797  const std::string& fileName,
798  const std::string& geoColumnName,
799  const CopyParams& copy_params);
800  static void readMetadataSampleGDAL(
801  const std::string& fileName,
802  const std::string& geoColumnName,
803  std::map<std::string, std::vector<std::string>>& metadata,
804  int rowLimit,
805  const CopyParams& copy_params);
806  static bool gdalFileExists(const std::string& path, const CopyParams& copy_params);
807  static bool gdalFileOrDirectoryExists(const std::string& path,
808  const CopyParams& copy_params);
809  static std::vector<std::string> gdalGetAllFilesInArchive(
810  const std::string& archive_path,
811  const CopyParams& copy_params);
814  GeoFileLayerInfo(const std::string& name_, GeoFileLayerContents contents_)
815  : name(name_), contents(contents_) {}
816  std::string name;
818  };
819  static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
820  const std::string& file_name,
821  const CopyParams& copy_params);
822  Catalog_Namespace::Catalog& getCatalog() { return loader->getCatalog(); }
823  static void set_geo_physical_import_buffer(
824  const Catalog_Namespace::Catalog& catalog,
825  const ColumnDescriptor* cd,
826  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
827  size_t& col_idx,
828  std::vector<double>& coords,
829  std::vector<double>& bounds,
830  std::vector<int>& ring_sizes,
831  std::vector<int>& poly_rings,
832  int render_group);
834  const Catalog_Namespace::Catalog& catalog,
835  const ColumnDescriptor* cd,
836  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
837  size_t& col_idx,
838  std::vector<std::vector<double>>& coords_column,
839  std::vector<std::vector<double>>& bounds_column,
840  std::vector<std::vector<int>>& ring_sizes_column,
841  std::vector<std::vector<int>>& poly_rings_column,
842  std::vector<int>& render_groups_column);
843  void checkpoint(const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
844  auto getLoader() const { return loader.get(); }
845 
846  private:
847  static bool gdalStatInternal(const std::string& path,
848  const CopyParams& copy_params,
849  bool also_dir);
850  static OGRDataSource* openGDALDataset(const std::string& fileName,
851  const CopyParams& copy_params);
853  std::string import_id;
854  size_t file_size;
855  size_t max_threads;
856  char* buffer[2];
857  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>> import_buffers_vec;
858  std::unique_ptr<Loader> loader;
859  std::unique_ptr<bool[]> is_array_a;
860  static std::mutex init_gdal_mutex;
861 };
862 
863 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
864  const TableDescriptor* td,
865  Loader* loader);
866 
867 std::vector<std::unique_ptr<TypedImportBuffer>> fill_missing_columns(
869  Fragmenter_Namespace::InsertData& insert_data);
870 
871 } // namespace import_export
872 
873 #endif // _IMPORTER_H_
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
Definition: Importer.h:518
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:72
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:536
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:315
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:4906
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
Definition: Importer.cpp:4343
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:4114
std::mutex loader_mutex_
Definition: Importer.h:616
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:280
StringDictionary * getStringDictionary() const
Definition: Importer.h:284
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:2981
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
void addBigint(const int64_t v)
Definition: Importer.h:231
std::string cat(Ts &&...args)
void addSmallint(const int16_t v)
Definition: Importer.h:227
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
Definition: sqltypes.h:48
SQLTypes
Definition: sqltypes.h:37
void addDictEncodedStringArray(const std::vector< std::vector< std::string >> &string_array_vec)
Definition: Importer.h:254
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
Definition: Importer.h:85
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3848
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params)
Definition: Importer.cpp:532
const TableDescriptor * getTableDesc() const
Definition: Importer.h:535
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:2947
std::vector< std::string > * string_buffer_
Definition: Importer.h:494
void addString(const std::string_view v)
Definition: Importer.h:237
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:496
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3222
StringDictionary * string_dict_
Definition: Importer.h:506
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3369
std::atomic< int > nerrors
Definition: Importer.h:77
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:417
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
Definition: Importer.cpp:1559
void addDouble(const double v)
Definition: Importer.h:235
void addStringArray(const std::vector< std::string > &arr)
Definition: Importer.h:248
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5369
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:171
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:489
const bool * get_is_array() const
Definition: Importer.h:789
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
Definition: Importer.h:350
const TableDescriptor * table_desc_
Definition: Importer.h:585
virtual void checkpoint()
Definition: Importer.cpp:4335
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5354
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
Definition: Importer.h:783
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:699
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4839
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:626
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2764
const CopyParams & get_copy_params() const
Definition: Importer.h:667
std::vector< float > * float_buffer_
Definition: Importer.h:492
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:786
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4704
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
Definition: Importer.h:814
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4737
std::pair< BoundingBox, int > Node
Definition: Importer.h:753
std::vector< double > * double_buffer_
Definition: Importer.h:493
void addFloat(const float v)
Definition: Importer.h:233
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
std::vector< std::string > * getStringBuffer() const
Definition: Importer.h:340
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:981
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
DataStreamSink(const CopyParams &copy_params, const std::string file_path)
Definition: Importer.h:653
ImportStatus & operator+=(const ImportStatus &is)
Definition: Importer.h:638
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2957
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
Definition: Importer.h:239
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:3110
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3298
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void setAddingColumns(const bool adding_columns)
Definition: Importer.h:562
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:576
std::vector< int32_t > * int_buffer_
Definition: Importer.h:490
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4354
std::vector< ArrayDatum > * string_array_dict_buffer_
Definition: Importer.h:503
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2829
CONSTEXPR DEVICE bool is_null(const T &value)
DEVICE Array(const int64_t size, const bool is_null=false)
Definition: OmniSciTypes.h:38
void addBoolean(const int8_t v)
Definition: Importer.h:223
auto getLoader() const
Definition: Importer.h:844
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:500
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4614
void addTinyint(const int8_t v)
Definition: Importer.h:225
std::shared_timed_mutex mapd_shared_mutex
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4480
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:491
int8_t * getAsBytes() const
Definition: Importer.h:286
std::string error_msg_
Definition: Importer.h:617
void addInt(const int32_t v)
Definition: Importer.h:229
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
Definition: Importer.cpp:1459
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:822
std::vector< std::vector< std::string > > raw_rows
Definition: Importer.h:697
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2578
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2633
bool isAddingColumns() const
Definition: Importer.h:563
std::vector< EncodingType > best_encodings
Definition: Importer.h:700
std::vector< int8_t > * bool_buffer_
Definition: Importer.h:487
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857
bool g_enable_smem_group_by true
std::vector< std::vector< std::string > > * string_array_buffer_
Definition: Importer.h:497
boost::filesystem::path file_path
Definition: Importer.h:732
size_t getElementSize() const
Definition: Importer.h:313
int8_t * getStringDictBuffer() const
Definition: Importer.h:354
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4742
std::set< int64_t > rows
Definition: Importer.h:76
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:859
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Definition: Importer.h:482
bool checkpoint() noexcept
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
Definition: sqltypes.h:51
Definition: sqltypes.h:52
Detector(const boost::filesystem::path &fp, CopyParams &cp)
Definition: Importer.h:686
std::vector< std::string > & addStringArray()
Definition: Importer.h:243
boost::geometry::index::rtree< Node, boost::geometry::index::quadratic< 16 >> RTree
Definition: Importer.h:755
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2571
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
std::vector< int32_t > * string_dict_i32_buffer_
Definition: Importer.h:502
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3200
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
void addArray(const ArrayDatum &v)
Definition: Importer.h:241
std::vector< std::vector< std::string > > * getStringArrayBuffer() const
Definition: Importer.h:346
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2890
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:534
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3415
std::string getErrorMessage()
Definition: Importer.h:565
std::chrono::duration< double > timeout
Definition: Importer.h:733
Definition: sqltypes.h:40
std::vector< std::string > * getGeoStringBuffer() const
Definition: Importer.h:342
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3349
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3334
#define IS_STRING(T)
Definition: sqltypes.h:244
std::string import_id
Definition: Importer.h:853
const ColumnDescriptor * column_desc_
Definition: Importer.h:505
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:752
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:880
std::unique_ptr< RTree > _rtree
Definition: Importer.h:756
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3379
std::chrono::steady_clock::time_point start
Definition: Importer.h:621
std::vector< uint16_t > * string_dict_i16_buffer_
Definition: Importer.h:501
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2797
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3192
std::vector< std::string > get_headers()
Definition: Importer.cpp:3357
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
const CopyParams & get_copy_params() const
Definition: Importer.h:776
bool g_enable_watchdog false
Definition: Execute.cpp:76
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:488
#define CHECK(condition)
Definition: Logger.h:203
std::string raw_data
Definition: Importer.h:731
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:230
char * t
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
const ColumnDescriptor * getColumnDesc() const
Definition: Importer.h:282
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:540
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2705
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:4814
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
char * f
static constexpr size_t MAX_STRLEN
Definition: sqltypes.h:44
SQLTypeInfo columnType
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4424
bool is_string() const
Definition: sqltypes.h:489
LoadCallbackType load_callback_
Definition: Importer.h:587
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2743
void seedFromExistingTableContents(Catalog_Namespace::Catalog &cat, const std::string &tableName, const std::string &geoColumnBaseName)
Definition: Importer.cpp:5171
static std::mutex init_gdal_mutex
Definition: Importer.h:860
std::vector< size_t > file_offsets
Definition: Importer.h:680
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589
std::vector< std::string > * geo_string_buffer_
Definition: Importer.h:495
std::chrono::steady_clock::time_point end
Definition: Importer.h:622
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:5308
std::vector< ArrayDatum > * getArrayBuffer() const
Definition: Importer.h:344
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:751
std::unique_ptr< Loader > loader
Definition: Importer.h:858
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
Definition: Importer.h:513
void addDictEncodedString(const std::vector< std::string > &string_vec)
Definition: Importer.cpp:492
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:777
const std::string file_path
Definition: Importer.h:675
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:4348