OmniSciDB  f632821e96
Importer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.h
19  * @author Wei Hong < wei@mapd.com>
20  * @brief Importer class for table import from file
21  */
22 #ifndef _IMPORTER_H_
23 #define _IMPORTER_H_
24 
25 #include <gdal.h>
26 #include <ogrsf_frmts.h>
27 
28 #include <atomic>
29 #include <boost/filesystem.hpp>
30 #include <boost/noncopyable.hpp>
31 #include <boost/tokenizer.hpp>
32 #include <condition_variable>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <iostream>
36 #include <list>
37 #include <map>
38 #include <memory>
39 #include <mutex>
40 #include <set>
41 #include <string>
42 #include <string_view>
43 #include <utility>
44 
45 #include "Catalog/Catalog.h"
47 #include "DataMgr/Chunk/Chunk.h"
48 #include "Fragmenter/Fragmenter.h"
50 #include "Logger/Logger.h"
52 #include "Shared/checked_alloc.h"
53 #include "Shared/fixautotools.h"
54 
55 // Some builds of boost::geometry require iostream, but don't explicitly include it.
56 // Placing in own section to ensure it's included after iostream.
57 #include <boost/geometry/index/rtree.hpp>
58 
59 class TDatum;
60 class TColumn;
61 
62 namespace arrow {
63 
64 class Array;
65 
66 } // namespace arrow
67 
68 namespace import_export {
69 
70 class Importer;
71 
72 using ArraySliceRange = std::pair<size_t, size_t>;
73 
75  std::mutex mutex;
76  std::set<int64_t> rows;
77  std::atomic<int> nerrors;
78  std::string file_name;
79  int row_group;
81 };
82 
83 class TypedImportBuffer : boost::noncopyable {
84  public:
85  TypedImportBuffer(const ColumnDescriptor* col_desc, StringDictionary* string_dict)
86  : column_desc_(col_desc), string_dict_(string_dict) {
87  switch (col_desc->columnType.get_type()) {
88  case kBOOLEAN:
89  bool_buffer_ = new std::vector<int8_t>();
90  break;
91  case kTINYINT:
92  tinyint_buffer_ = new std::vector<int8_t>();
93  break;
94  case kSMALLINT:
95  smallint_buffer_ = new std::vector<int16_t>();
96  break;
97  case kINT:
98  int_buffer_ = new std::vector<int32_t>();
99  break;
100  case kBIGINT:
101  case kNUMERIC:
102  case kDECIMAL:
103  bigint_buffer_ = new std::vector<int64_t>();
104  break;
105  case kFLOAT:
106  float_buffer_ = new std::vector<float>();
107  break;
108  case kDOUBLE:
109  double_buffer_ = new std::vector<double>();
110  break;
111  case kTEXT:
112  case kVARCHAR:
113  case kCHAR:
114  string_buffer_ = new std::vector<std::string>();
115  if (col_desc->columnType.get_compression() == kENCODING_DICT) {
116  switch (col_desc->columnType.get_size()) {
117  case 1:
118  string_dict_i8_buffer_ = new std::vector<uint8_t>();
119  break;
120  case 2:
121  string_dict_i16_buffer_ = new std::vector<uint16_t>();
122  break;
123  case 4:
124  string_dict_i32_buffer_ = new std::vector<int32_t>();
125  break;
126  default:
127  CHECK(false);
128  }
129  }
130  break;
131  case kDATE:
132  case kTIME:
133  case kTIMESTAMP:
134  bigint_buffer_ = new std::vector<int64_t>();
135  break;
136  case kARRAY:
137  if (IS_STRING(col_desc->columnType.get_subtype())) {
139  string_array_buffer_ = new std::vector<std::vector<std::string>>();
140  string_array_dict_buffer_ = new std::vector<ArrayDatum>();
141  } else {
142  array_buffer_ = new std::vector<ArrayDatum>();
143  }
144  break;
145  case kPOINT:
146  case kLINESTRING:
147  case kPOLYGON:
148  case kMULTIPOLYGON:
149  geo_string_buffer_ = new std::vector<std::string>();
150  break;
151  default:
152  CHECK(false);
153  }
154  }
155 
157  switch (column_desc_->columnType.get_type()) {
158  case kBOOLEAN:
159  delete bool_buffer_;
160  break;
161  case kTINYINT:
162  delete tinyint_buffer_;
163  break;
164  case kSMALLINT:
165  delete smallint_buffer_;
166  break;
167  case kINT:
168  delete int_buffer_;
169  break;
170  case kBIGINT:
171  case kNUMERIC:
172  case kDECIMAL:
173  delete bigint_buffer_;
174  break;
175  case kFLOAT:
176  delete float_buffer_;
177  break;
178  case kDOUBLE:
179  delete double_buffer_;
180  break;
181  case kTEXT:
182  case kVARCHAR:
183  case kCHAR:
184  delete string_buffer_;
185  if (column_desc_->columnType.get_compression() == kENCODING_DICT) {
186  switch (column_desc_->columnType.get_size()) {
187  case 1:
188  delete string_dict_i8_buffer_;
189  break;
190  case 2:
191  delete string_dict_i16_buffer_;
192  break;
193  case 4:
194  delete string_dict_i32_buffer_;
195  break;
196  }
197  }
198  break;
199  case kDATE:
200  case kTIME:
201  case kTIMESTAMP:
202  delete bigint_buffer_;
203  break;
204  case kARRAY:
205  if (IS_STRING(column_desc_->columnType.get_subtype())) {
206  delete string_array_buffer_;
207  delete string_array_dict_buffer_;
208  } else {
209  delete array_buffer_;
210  }
211  break;
212  case kPOINT:
213  case kLINESTRING:
214  case kPOLYGON:
215  case kMULTIPOLYGON:
216  delete geo_string_buffer_;
217  break;
218  default:
219  CHECK(false);
220  }
221  }
222 
223  void addBoolean(const int8_t v) { bool_buffer_->push_back(v); }
224 
225  void addTinyint(const int8_t v) { tinyint_buffer_->push_back(v); }
226 
227  void addSmallint(const int16_t v) { smallint_buffer_->push_back(v); }
228 
229  void addInt(const int32_t v) { int_buffer_->push_back(v); }
230 
231  void addBigint(const int64_t v) { bigint_buffer_->push_back(v); }
232 
233  void addFloat(const float v) { float_buffer_->push_back(v); }
234 
235  void addDouble(const double v) { double_buffer_->push_back(v); }
236 
237  void addString(const std::string_view v) { string_buffer_->emplace_back(v); }
238 
239  void addGeoString(const std::string_view v) { geo_string_buffer_->emplace_back(v); }
240 
241  void addArray(const ArrayDatum& v) { array_buffer_->push_back(v); }
242 
243  std::vector<std::string>& addStringArray() {
244  string_array_buffer_->emplace_back();
245  return string_array_buffer_->back();
246  }
247 
248  void addStringArray(const std::vector<std::string>& arr) {
249  string_array_buffer_->push_back(arr);
250  }
251 
252  void addDictEncodedString(const std::vector<std::string>& string_vec);
253 
255  const std::vector<std::vector<std::string>>& string_array_vec) {
256  CHECK(string_dict_);
257 
258  // first check data is ok
259  for (auto& p : string_array_vec) {
260  for (const auto& str : p) {
261  if (str.size() > StringDictionary::MAX_STRLEN) {
262  throw std::runtime_error("String too long for dictionary encoding.");
263  }
264  }
265  }
266 
267  std::vector<std::vector<int32_t>> ids_array(0);
268  string_dict_->getOrAddBulkArray(string_array_vec, ids_array);
269 
270  for (auto& p : ids_array) {
271  size_t len = p.size() * sizeof(int32_t);
272  auto a = static_cast<int32_t*>(checked_malloc(len));
273  memcpy(a, &p[0], len);
274  // TODO: distinguish between empty and NULL
275  string_array_dict_buffer_->push_back(
276  ArrayDatum(len, reinterpret_cast<int8_t*>(a), len == 0));
277  }
278  }
279 
280  const SQLTypeInfo& getTypeInfo() const { return column_desc_->columnType; }
281 
282  const ColumnDescriptor* getColumnDesc() const { return column_desc_; }
283 
284  StringDictionary* getStringDictionary() const { return string_dict_; }
285 
286  int8_t* getAsBytes() const {
287  switch (column_desc_->columnType.get_type()) {
288  case kBOOLEAN:
289  return reinterpret_cast<int8_t*>(&((*bool_buffer_)[0]));
290  case kTINYINT:
291  return reinterpret_cast<int8_t*>(&((*tinyint_buffer_)[0]));
292  case kSMALLINT:
293  return reinterpret_cast<int8_t*>(&((*smallint_buffer_)[0]));
294  case kINT:
295  return reinterpret_cast<int8_t*>(&((*int_buffer_)[0]));
296  case kBIGINT:
297  case kNUMERIC:
298  case kDECIMAL:
299  return reinterpret_cast<int8_t*>(&((*bigint_buffer_)[0]));
300  case kFLOAT:
301  return reinterpret_cast<int8_t*>(&((*float_buffer_)[0]));
302  case kDOUBLE:
303  return reinterpret_cast<int8_t*>(&((*double_buffer_)[0]));
304  case kDATE:
305  case kTIME:
306  case kTIMESTAMP:
307  return reinterpret_cast<int8_t*>(&((*bigint_buffer_)[0]));
308  default:
309  abort();
310  }
311  }
312 
313  size_t getElementSize() const {
314  switch (column_desc_->columnType.get_type()) {
315  case kBOOLEAN:
316  return sizeof((*bool_buffer_)[0]);
317  case kTINYINT:
318  return sizeof((*tinyint_buffer_)[0]);
319  case kSMALLINT:
320  return sizeof((*smallint_buffer_)[0]);
321  case kINT:
322  return sizeof((*int_buffer_)[0]);
323  case kBIGINT:
324  case kNUMERIC:
325  case kDECIMAL:
326  return sizeof((*bigint_buffer_)[0]);
327  case kFLOAT:
328  return sizeof((*float_buffer_)[0]);
329  case kDOUBLE:
330  return sizeof((*double_buffer_)[0]);
331  case kDATE:
332  case kTIME:
333  case kTIMESTAMP:
334  return sizeof((*bigint_buffer_)[0]);
335  default:
336  abort();
337  }
338  }
339 
340  std::vector<std::string>* getStringBuffer() const { return string_buffer_; }
341 
342  std::vector<std::string>* getGeoStringBuffer() const { return geo_string_buffer_; }
343 
344  std::vector<ArrayDatum>* getArrayBuffer() const { return array_buffer_; }
345 
346  std::vector<std::vector<std::string>>* getStringArrayBuffer() const {
347  return string_array_buffer_;
348  }
349 
350  std::vector<ArrayDatum>* getStringArrayDictBuffer() const {
351  return string_array_dict_buffer_;
352  }
353 
354  int8_t* getStringDictBuffer() const {
355  switch (column_desc_->columnType.get_size()) {
356  case 1:
357  return reinterpret_cast<int8_t*>(&((*string_dict_i8_buffer_)[0]));
358  case 2:
359  return reinterpret_cast<int8_t*>(&((*string_dict_i16_buffer_)[0]));
360  case 4:
361  return reinterpret_cast<int8_t*>(&((*string_dict_i32_buffer_)[0]));
362  default:
363  abort();
364  }
365  }
366 
368  if (string_dict_ == nullptr) {
369  return true;
370  }
371  return string_dict_->checkpoint();
372  }
373 
374  void clear() {
375  switch (column_desc_->columnType.get_type()) {
376  case kBOOLEAN: {
377  bool_buffer_->clear();
378  break;
379  }
380  case kTINYINT: {
381  tinyint_buffer_->clear();
382  break;
383  }
384  case kSMALLINT: {
385  smallint_buffer_->clear();
386  break;
387  }
388  case kINT: {
389  int_buffer_->clear();
390  break;
391  }
392  case kBIGINT:
393  case kNUMERIC:
394  case kDECIMAL: {
395  bigint_buffer_->clear();
396  break;
397  }
398  case kFLOAT: {
399  float_buffer_->clear();
400  break;
401  }
402  case kDOUBLE: {
403  double_buffer_->clear();
404  break;
405  }
406  case kTEXT:
407  case kVARCHAR:
408  case kCHAR: {
409  string_buffer_->clear();
410  if (column_desc_->columnType.get_compression() == kENCODING_DICT) {
411  switch (column_desc_->columnType.get_size()) {
412  case 1:
413  string_dict_i8_buffer_->clear();
414  break;
415  case 2:
416  string_dict_i16_buffer_->clear();
417  break;
418  case 4:
419  string_dict_i32_buffer_->clear();
420  break;
421  default:
422  CHECK(false);
423  }
424  }
425  break;
426  }
427  case kDATE:
428  case kTIME:
429  case kTIMESTAMP:
430  bigint_buffer_->clear();
431  break;
432  case kARRAY: {
433  if (IS_STRING(column_desc_->columnType.get_subtype())) {
434  string_array_buffer_->clear();
435  string_array_dict_buffer_->clear();
436  } else {
437  array_buffer_->clear();
438  }
439  break;
440  }
441  case kPOINT:
442  case kLINESTRING:
443  case kPOLYGON:
444  case kMULTIPOLYGON:
445  geo_string_buffer_->clear();
446  break;
447  default:
448  CHECK(false);
449  }
450  }
451 
452  size_t add_values(const ColumnDescriptor* cd, const TColumn& data);
453 
454  size_t add_arrow_values(const ColumnDescriptor* cd,
455  const arrow::Array& data,
456  const bool exact_type_match,
457  const ArraySliceRange& slice_range,
458  BadRowsTracker* bad_rows_tracker);
459 
460  void add_value(const ColumnDescriptor* cd,
461  const std::string_view val,
462  const bool is_null,
463  const CopyParams& copy_params,
464  const int64_t replicate_count = 0);
465 
466  void add_value(const ColumnDescriptor* cd,
467  const TDatum& val,
468  const bool is_null,
469  const int64_t replicate_count = 0);
470 
471  void pop_value();
472 
473  int64_t get_replicate_count() const { return replicate_count_; }
474  void set_replicate_count(const int64_t replicate_count) {
475  replicate_count_ = replicate_count;
476  }
477  template <typename DATA_TYPE>
478  size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor* cd,
479  const arrow::Array& array,
480  std::vector<DATA_TYPE>& buffer,
481  const ArraySliceRange& slice_range,
482  BadRowsTracker* const bad_rows_tracker);
483  template <typename DATA_TYPE>
484  auto del_values(std::vector<DATA_TYPE>& buffer, BadRowsTracker* const bad_rows_tracker);
485  auto del_values(const SQLTypes type, BadRowsTracker* const bad_rows_tracker);
486  std::vector<std::unique_ptr<TypedImportBuffer>>* import_buffers;
487  size_t col_idx;
488 
489  private:
490  union {
491  std::vector<int8_t>* bool_buffer_;
492  std::vector<int8_t>* tinyint_buffer_;
493  std::vector<int16_t>* smallint_buffer_;
494  std::vector<int32_t>* int_buffer_;
495  std::vector<int64_t>* bigint_buffer_;
496  std::vector<float>* float_buffer_;
497  std::vector<double>* double_buffer_;
498  std::vector<std::string>* string_buffer_;
499  std::vector<std::string>* geo_string_buffer_;
500  std::vector<ArrayDatum>* array_buffer_;
501  std::vector<std::vector<std::string>>* string_array_buffer_;
502  };
503  union {
504  std::vector<uint8_t>* string_dict_i8_buffer_;
505  std::vector<uint16_t>* string_dict_i16_buffer_;
506  std::vector<int32_t>* string_dict_i32_buffer_;
507  std::vector<ArrayDatum>* string_array_dict_buffer_;
508  };
511  size_t replicate_count_ = 0;
512 };
513 
514 class Loader {
515  using LoadCallbackType =
516  std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
517  std::vector<DataBlockPtr>&,
518  size_t)>;
519 
520  public:
521  // TODO: Remove the `use_catalog_locks` parameter once Loader is refactored out of
522  // ParquetDataWrapper
524  const TableDescriptor* t,
525  LoadCallbackType load_callback = nullptr,
526  bool use_catalog_locks = true)
527  : catalog_(c)
528  , table_desc_(t)
529  , column_descs_(
530  use_catalog_locks
531  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
532  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
533  , load_callback_(load_callback) {
534  init(use_catalog_locks);
535  }
536 
537  virtual ~Loader() {}
538 
539  Catalog_Namespace::Catalog& getCatalog() const { return catalog_; }
540  const TableDescriptor* getTableDesc() const { return table_desc_; }
541  const std::list<const ColumnDescriptor*>& get_column_descs() const {
542  return column_descs_;
543  }
544 
546  if ((cd->columnType.get_type() != kARRAY ||
547  !IS_STRING(cd->columnType.get_subtype())) &&
548  (!cd->columnType.is_string() ||
550  return nullptr;
551  }
552  return dict_map_.at(cd->columnId);
553  }
554 
555  virtual bool load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
556  const size_t row_count);
557  virtual bool loadNoCheckpoint(
558  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
559  const size_t row_count);
560  virtual void checkpoint();
561  virtual std::vector<Catalog_Namespace::TableEpochInfo> getTableEpochs() const;
562  virtual void setTableEpochs(
563  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
564 
565  void setReplicating(const bool replicating) { replicating_ = replicating; }
566  bool getReplicating() const { return replicating_; }
567  void dropColumns(const std::vector<int>& columns);
568 
569  protected:
570  void init(const bool use_catalog_locks);
571 
572  virtual bool loadImpl(
573  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
574  size_t row_count,
575  bool checkpoint);
576 
577  using OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>;
578  void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
579  std::vector<size_t>& all_shard_row_counts,
580  const OneShardBuffers& import_buffers,
581  const size_t row_count,
582  const size_t shard_count);
583 
586  std::list<const ColumnDescriptor*> column_descs_;
589  std::map<int, StringDictionary*> dict_map_;
590 
591  private:
592  std::vector<DataBlockPtr> get_data_block_pointers(
593  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
594  bool loadToShard(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
595  size_t row_count,
596  const TableDescriptor* shard_table,
597  bool checkpoint);
598 
599  bool replicating_ = false;
600  std::mutex loader_mutex_;
601 };
602 
603 struct ImportStatus {
604  std::chrono::steady_clock::time_point start;
605  std::chrono::steady_clock::time_point end;
609  std::chrono::duration<size_t, std::milli> elapsed;
611  int thread_id; // to recall thread_id after thread exit
613  : start(std::chrono::steady_clock::now())
614  , rows_completed(0)
615  , rows_estimated(0)
616  , rows_rejected(0)
617  , elapsed(0)
618  , load_truncated(0)
619  , thread_id(0) {}
620 
622  rows_completed += is.rows_completed;
623  rows_rejected += is.rows_rejected;
624 
625  return *this;
626  }
627 };
628 
630  public:
632  DataStreamSink(const CopyParams& copy_params, const std::string file_path)
633  : copy_params(copy_params), file_path(file_path) {}
634  virtual ~DataStreamSink() {}
635  virtual ImportStatus importDelimited(const std::string& file_path,
636  const bool decompressed) = 0;
637 #ifdef ENABLE_IMPORT_PARQUET
638  virtual void import_parquet(std::vector<std::string>& file_paths);
639  virtual void import_local_parquet(const std::string& file_path) = 0;
640 #endif
641  const CopyParams& get_copy_params() const { return copy_params; }
642  void import_compressed(std::vector<std::string>& file_paths);
643 
644  protected:
645  ImportStatus archivePlumber();
646 
648  const std::string file_path;
649  FILE* p_file = nullptr;
651  bool load_failed = false;
652  size_t total_file_size{0};
653  std::vector<size_t> file_offsets;
654  std::mutex file_offsets_mutex;
655 };
656 
657 class Detector : public DataStreamSink {
658  public:
659  Detector(const boost::filesystem::path& fp, CopyParams& cp)
660  : DataStreamSink(cp, fp.string()), file_path(fp) {
661  read_file();
662  init();
663  };
664 #ifdef ENABLE_IMPORT_PARQUET
665  void import_local_parquet(const std::string& file_path) override;
666 #endif
667  static SQLTypes detect_sqltype(const std::string& str);
668  std::vector<std::string> get_headers();
669  std::vector<std::vector<std::string>> raw_rows;
670  std::vector<std::vector<std::string>> get_sample_rows(size_t n);
671  std::vector<SQLTypes> best_sqltypes;
672  std::vector<EncodingType> best_encodings;
673  bool has_headers = false;
674 
675  private:
676  void init();
677  void read_file();
678  void detect_row_delimiter();
679  void split_raw_data();
680  std::vector<SQLTypes> detect_column_types(const std::vector<std::string>& row);
681  static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b);
682  void find_best_sqltypes();
683  std::vector<SQLTypes> find_best_sqltypes(
684  const std::vector<std::vector<std::string>>& raw_rows,
685  const CopyParams& copy_params);
686  std::vector<SQLTypes> find_best_sqltypes(
687  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
688  const std::vector<std::vector<std::string>>::const_iterator& row_end,
689  const CopyParams& copy_params);
690 
691  std::vector<EncodingType> find_best_encodings(
692  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
693  const std::vector<std::vector<std::string>>::const_iterator& row_end,
694  const std::vector<SQLTypes>& best_types);
695 
696  bool detect_headers(const std::vector<SQLTypes>& first_types,
697  const std::vector<SQLTypes>& rest_types);
698  void find_best_sqltypes_and_headers();
699  ImportStatus importDelimited(const std::string& file_path,
700  const bool decompressed) override;
701  std::string raw_data;
702  boost::filesystem::path file_path;
703  std::chrono::duration<double> timeout{1};
704  std::string line1;
705 };
706 
708  public:
709  static ArrayDatum composeNullArray(const SQLTypeInfo& ti);
710 };
711 
713  public:
714  RenderGroupAnalyzer() : _rtree(std::make_unique<RTree>()), _numRenderGroups(0) {}
715  void seedFromExistingTableContents(const std::unique_ptr<Loader>& loader,
716  const std::string& geoColumnBaseName);
717  int insertBoundsAndReturnRenderGroup(const std::vector<double>& bounds);
718 
719  private:
720  using Point = boost::geometry::model::point<double, 2, boost::geometry::cs::cartesian>;
721  using BoundingBox = boost::geometry::model::box<Point>;
722  using Node = std::pair<BoundingBox, int>;
723  using RTree =
724  boost::geometry::index::rtree<Node, boost::geometry::index::quadratic<16>>;
725  std::unique_ptr<RTree> _rtree;
726  std::mutex _rtreeMutex;
728 };
729 
730 class Importer : public DataStreamSink {
731  public:
733  const TableDescriptor* t,
734  const std::string& f,
735  const CopyParams& p);
736  Importer(Loader* providedLoader, const std::string& f, const CopyParams& p);
737  ~Importer() override;
738  ImportStatus import();
739  ImportStatus importDelimited(const std::string& file_path,
740  const bool decompressed) override;
741  ImportStatus importGDAL(std::map<std::string, std::string> colname_to_src);
742  const CopyParams& get_copy_params() const { return copy_params; }
743  const std::list<const ColumnDescriptor*>& get_column_descs() const {
744  return loader->get_column_descs();
745  }
746  void load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
747  size_t row_count);
748  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>>& get_import_buffers_vec() {
749  return import_buffers_vec;
750  }
751  std::vector<std::unique_ptr<TypedImportBuffer>>& get_import_buffers(int i) {
752  return import_buffers_vec[i];
753  }
754  const bool* get_is_array() const { return is_array_a.get(); }
755 #ifdef ENABLE_IMPORT_PARQUET
756  void import_local_parquet(const std::string& file_path) override;
757 #endif
758  static ImportStatus get_import_status(const std::string& id);
759  static void set_import_status(const std::string& id, const ImportStatus is);
760  static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
761  const std::string& fileName,
762  const std::string& geoColumnName,
763  const CopyParams& copy_params);
764  static void readMetadataSampleGDAL(
765  const std::string& fileName,
766  const std::string& geoColumnName,
767  std::map<std::string, std::vector<std::string>>& metadata,
768  int rowLimit,
769  const CopyParams& copy_params);
770  static bool gdalFileExists(const std::string& path, const CopyParams& copy_params);
771  static bool gdalFileOrDirectoryExists(const std::string& path,
772  const CopyParams& copy_params);
773  static std::vector<std::string> gdalGetAllFilesInArchive(
774  const std::string& archive_path,
775  const CopyParams& copy_params);
776  enum class GeoFileLayerContents { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };
778  GeoFileLayerInfo(const std::string& name_, GeoFileLayerContents contents_)
779  : name(name_), contents(contents_) {}
780  std::string name;
782  };
783  static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
784  const std::string& file_name,
785  const CopyParams& copy_params);
786  Catalog_Namespace::Catalog& getCatalog() { return loader->getCatalog(); }
787  static void set_geo_physical_import_buffer(
788  const Catalog_Namespace::Catalog& catalog,
789  const ColumnDescriptor* cd,
790  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
791  size_t& col_idx,
792  std::vector<double>& coords,
793  std::vector<double>& bounds,
794  std::vector<int>& ring_sizes,
795  std::vector<int>& poly_rings,
796  int render_group,
797  const int64_t replicate_count = 0);
798  static void set_geo_physical_import_buffer_columnar(
799  const Catalog_Namespace::Catalog& catalog,
800  const ColumnDescriptor* cd,
801  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
802  size_t& col_idx,
803  std::vector<std::vector<double>>& coords_column,
804  std::vector<std::vector<double>>& bounds_column,
805  std::vector<std::vector<int>>& ring_sizes_column,
806  std::vector<std::vector<int>>& poly_rings_column,
807  int render_group,
808  const int64_t replicate_count = 0);
809  void checkpoint(const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
810  auto getLoader() const { return loader.get(); }
811 
812  private:
813  static bool gdalStatInternal(const std::string& path,
814  const CopyParams& copy_params,
815  bool also_dir);
816  static OGRDataSource* openGDALDataset(const std::string& fileName,
817  const CopyParams& copy_params);
818  static void setGDALAuthorizationTokens(const CopyParams& copy_params);
819  std::string import_id;
820  size_t file_size;
821  size_t max_threads;
822  char* buffer[2];
823  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>> import_buffers_vec;
824  std::unique_ptr<Loader> loader;
825  std::unique_ptr<bool[]> is_array_a;
826 };
827 
828 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
829  const TableDescriptor* td,
830  Loader* loader);
831 
832 } // namespace import_export
833 
834 #endif // _IMPORTER_H_
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
Definition: Importer.h:523
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:72
std::mutex loader_mutex_
Definition: Importer.h:600
StringDictionary * getStringDictionary() const
Definition: Importer.h:284
void addBigint(const int64_t v)
Definition: Importer.h:231
bool is_string() const
Definition: sqltypes.h:417
void addSmallint(const int16_t v)
Definition: Importer.h:227
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
Definition: sqltypes.h:51
SQLTypes
Definition: sqltypes.h:40
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:541
void addDictEncodedStringArray(const std::vector< std::vector< std::string >> &string_array_vec)
Definition: Importer.h:254
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
Definition: Importer.h:85
std::vector< std::string > * string_buffer_
Definition: Importer.h:498
void addString(const std::string_view v)
Definition: Importer.h:237
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:500
const ColumnDescriptor * getColumnDesc() const
Definition: Importer.h:282
StringDictionary * string_dict_
Definition: Importer.h:510
std::atomic< int > nerrors
Definition: Importer.h:77
void set_replicate_count(const int64_t replicate_count)
Definition: Importer.h:474
void addDouble(const double v)
Definition: Importer.h:235
void addStringArray(const std::vector< std::string > &arr)
Definition: Importer.h:248
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:493
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:545
const TableDescriptor * table_desc_
Definition: Importer.h:585
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5149
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
Definition: Importer.h:350
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
Definition: Importer.h:748
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:743
bool getReplicating() const
Definition: Importer.h:566
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer > > &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
Definition: Importer.h:518
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:671
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:609
std::vector< std::string > * getGeoStringBuffer() const
Definition: Importer.h:342
int8_t * getAsBytes() const
Definition: Importer.h:286
std::vector< float > * float_buffer_
Definition: Importer.h:496
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:751
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
Definition: Importer.h:778
std::pair< BoundingBox, int > Node
Definition: Importer.h:722
std::vector< std::vector< std::string > > * getStringArrayBuffer() const
Definition: Importer.h:346
std::vector< double > * double_buffer_
Definition: Importer.h:497
void setReplicating(const bool replicating)
Definition: Importer.h:565
void addFloat(const float v)
Definition: Importer.h:233
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
DataStreamSink(const CopyParams &copy_params, const std::string file_path)
Definition: Importer.h:632
ImportStatus & operator+=(const ImportStatus &is)
Definition: Importer.h:621
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
Definition: Importer.h:239
std::vector< ArrayDatum > * getArrayBuffer() const
Definition: Importer.h:344
std::vector< int32_t > * int_buffer_
Definition: Importer.h:494
std::vector< ArrayDatum > * string_array_dict_buffer_
Definition: Importer.h:507
DEVICE Array(const int64_t size, const bool is_null=false)
Definition: OmniSciTypes.h:36
int64_t get_replicate_count() const
Definition: Importer.h:473
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
void addBoolean(const int8_t v)
Definition: Importer.h:223
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:504
void addTinyint(const int8_t v)
Definition: Importer.h:225
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:495
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:260
void addInt(const int32_t v)
Definition: Importer.h:229
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:786
std::vector< std::vector< std::string > > raw_rows
Definition: Importer.h:669
specifies the content in-memory of a row in the column metadata table
int8_t * getStringDictBuffer() const
Definition: Importer.h:354
std::vector< EncodingType > best_encodings
Definition: Importer.h:672
std::vector< int8_t > * bool_buffer_
Definition: Importer.h:491
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:823
std::vector< std::vector< std::string > > * string_array_buffer_
Definition: Importer.h:501
boost::filesystem::path file_path
Definition: Importer.h:702
auto getLoader() const
Definition: Importer.h:810
std::set< int64_t > rows
Definition: Importer.h:76
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:825
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Definition: Importer.h:486
Definition: sqltypes.h:54
Definition: sqltypes.h:55
Definition: Importer.h:62
Detector(const boost::filesystem::path &fp, CopyParams &cp)
Definition: Importer.h:659
std::vector< std::string > & addStringArray()
Definition: Importer.h:243
std::vector< int32_t > * string_dict_i32_buffer_
Definition: Importer.h:506
bool is_null(const T &v, const SQLTypeInfo &t)
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539
void addArray(const ArrayDatum &v)
Definition: Importer.h:241
const CopyParams & get_copy_params() const
Definition: Importer.h:641
Definition: sqltypes.h:43
#define IS_STRING(T)
Definition: sqltypes.h:173
std::string import_id
Definition: Importer.h:819
const ColumnDescriptor * column_desc_
Definition: Importer.h:509
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:721
std::unique_ptr< RTree > _rtree
Definition: Importer.h:725
std::chrono::steady_clock::time_point start
Definition: Importer.h:604
std::vector< uint16_t > * string_dict_i16_buffer_
Definition: Importer.h:505
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
const CopyParams & get_copy_params() const
Definition: Importer.h:742
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:492
#define CHECK(condition)
Definition: Logger.h:197
std::string raw_data
Definition: Importer.h:701
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
std::vector< std::string > * getStringBuffer() const
Definition: Importer.h:340
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
boost::geometry::index::rtree< Node, boost::geometry::index::quadratic< 16 > > RTree
Definition: Importer.h:724
static constexpr size_t MAX_STRLEN
Definition: sqltypes.h:47
SQLTypeInfo columnType
specifies the content in-memory of a row in the table metadata table
LoadCallbackType load_callback_
Definition: Importer.h:587
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:280
std::vector< size_t > file_offsets
Definition: Importer.h:653
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589
std::vector< std::string > * geo_string_buffer_
Definition: Importer.h:499
std::chrono::steady_clock::time_point end
Definition: Importer.h:605
std::vector< std::unique_ptr< TypedImportBuffer > > OneShardBuffers
Definition: Importer.h:577
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:720
std::unique_ptr< Loader > loader
Definition: Importer.h:824
const bool * get_is_array() const
Definition: Importer.h:754
const std::string file_path
Definition: Importer.h:648