OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.h
19  * @author Wei Hong < wei@mapd.com>
20  * @brief Importer class for table import from file
21  */
22 #ifndef _IMPORTER_H_
23 #define _IMPORTER_H_
24 
25 #include "Shared/Logger.h"
26 #include "Shared/fixautotools.h"
27 
28 #include <gdal.h>
29 #include <ogrsf_frmts.h>
30 
31 #include <atomic>
32 #include <boost/filesystem.hpp>
33 #include <boost/noncopyable.hpp>
34 #include <boost/tokenizer.hpp>
35 #include <condition_variable>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <iostream>
39 #include <list>
40 #include <map>
41 #include <memory>
42 #include <mutex>
43 #include <set>
44 #include <string>
45 #include <utility>
46 
47 #include "../Catalog/Catalog.h"
48 #include "../Catalog/TableDescriptor.h"
49 #include "../Chunk/Chunk.h"
50 #include "../Fragmenter/Fragmenter.h"
51 #include "../Shared/ThreadController.h"
52 #include "../Shared/checked_alloc.h"
53 #include "CopyParams.h"
54 
56 
57 // Some builds of boost::geometry require iostream, but don't explicitly include it.
58 // Placing in own section to ensure it's included after iostream.
59 #include <boost/geometry/index/rtree.hpp>
60 
61 class TDatum;
62 class TColumn;
63 
64 namespace arrow {
65 
66 class Array;
67 
68 } // namespace arrow
69 
70 namespace Importer_NS {
71 
72 class Importer;
73 
74 using ArraySliceRange = std::pair<size_t, size_t>;
75 
77  std::mutex mutex;
78  std::set<int64_t> rows;
79  std::atomic<int> nerrors;
80  std::string file_name;
81  int row_group;
83 };
84 
85 class TypedImportBuffer : boost::noncopyable {
86  public:
87  TypedImportBuffer(const ColumnDescriptor* col_desc, StringDictionary* string_dict)
88  : column_desc_(col_desc), string_dict_(string_dict) {
89  switch (col_desc->columnType.get_type()) {
90  case kBOOLEAN:
91  bool_buffer_ = new std::vector<int8_t>();
92  break;
93  case kTINYINT:
94  tinyint_buffer_ = new std::vector<int8_t>();
95  break;
96  case kSMALLINT:
97  smallint_buffer_ = new std::vector<int16_t>();
98  break;
99  case kINT:
100  int_buffer_ = new std::vector<int32_t>();
101  break;
102  case kBIGINT:
103  case kNUMERIC:
104  case kDECIMAL:
105  bigint_buffer_ = new std::vector<int64_t>();
106  break;
107  case kFLOAT:
108  float_buffer_ = new std::vector<float>();
109  break;
110  case kDOUBLE:
111  double_buffer_ = new std::vector<double>();
112  break;
113  case kTEXT:
114  case kVARCHAR:
115  case kCHAR:
116  string_buffer_ = new std::vector<std::string>();
117  if (col_desc->columnType.get_compression() == kENCODING_DICT) {
118  switch (col_desc->columnType.get_size()) {
119  case 1:
120  string_dict_i8_buffer_ = new std::vector<uint8_t>();
121  break;
122  case 2:
123  string_dict_i16_buffer_ = new std::vector<uint16_t>();
124  break;
125  case 4:
126  string_dict_i32_buffer_ = new std::vector<int32_t>();
127  break;
128  default:
129  CHECK(false);
130  }
131  }
132  break;
133  case kDATE:
134  case kTIME:
135  case kTIMESTAMP:
136  bigint_buffer_ = new std::vector<int64_t>();
137  break;
138  case kARRAY:
139  if (IS_STRING(col_desc->columnType.get_subtype())) {
141  string_array_buffer_ = new std::vector<std::vector<std::string>>();
142  string_array_dict_buffer_ = new std::vector<ArrayDatum>();
143  } else {
144  array_buffer_ = new std::vector<ArrayDatum>();
145  }
146  break;
147  case kPOINT:
148  case kLINESTRING:
149  case kPOLYGON:
150  case kMULTIPOLYGON:
151  geo_string_buffer_ = new std::vector<std::string>();
152  break;
153  default:
154  CHECK(false);
155  }
156  }
157 
159  switch (column_desc_->columnType.get_type()) {
160  case kBOOLEAN:
161  delete bool_buffer_;
162  break;
163  case kTINYINT:
164  delete tinyint_buffer_;
165  break;
166  case kSMALLINT:
167  delete smallint_buffer_;
168  break;
169  case kINT:
170  delete int_buffer_;
171  break;
172  case kBIGINT:
173  case kNUMERIC:
174  case kDECIMAL:
175  delete bigint_buffer_;
176  break;
177  case kFLOAT:
178  delete float_buffer_;
179  break;
180  case kDOUBLE:
181  delete double_buffer_;
182  break;
183  case kTEXT:
184  case kVARCHAR:
185  case kCHAR:
186  delete string_buffer_;
188  switch (column_desc_->columnType.get_size()) {
189  case 1:
190  delete string_dict_i8_buffer_;
191  break;
192  case 2:
194  break;
195  case 4:
197  break;
198  }
199  }
200  break;
201  case kDATE:
202  case kTIME:
203  case kTIMESTAMP:
204  delete bigint_buffer_;
205  break;
206  case kARRAY:
208  delete string_array_buffer_;
210  } else {
211  delete array_buffer_;
212  }
213  break;
214  case kPOINT:
215  case kLINESTRING:
216  case kPOLYGON:
217  case kMULTIPOLYGON:
218  delete geo_string_buffer_;
219  break;
220  default:
221  CHECK(false);
222  }
223  }
224 
225  void addBoolean(const int8_t v) { bool_buffer_->push_back(v); }
226 
227  void addTinyint(const int8_t v) { tinyint_buffer_->push_back(v); }
228 
229  void addSmallint(const int16_t v) { smallint_buffer_->push_back(v); }
230 
231  void addInt(const int32_t v) { int_buffer_->push_back(v); }
232 
233  void addBigint(const int64_t v) { bigint_buffer_->push_back(v); }
234 
235  void addFloat(const float v) { float_buffer_->push_back(v); }
236 
237  void addDouble(const double v) { double_buffer_->push_back(v); }
238 
239  void addString(const std::string& v) { string_buffer_->push_back(v); }
240 
241  void addGeoString(const std::string& v) { geo_string_buffer_->push_back(v); }
242 
243  void addArray(const ArrayDatum& v) { array_buffer_->push_back(v); }
244 
245  std::vector<std::string>& addStringArray() {
246  string_array_buffer_->push_back(std::vector<std::string>());
247  return string_array_buffer_->back();
248  }
249 
250  void addStringArray(const std::vector<std::string>& arr) {
251  string_array_buffer_->push_back(arr);
252  }
253 
254  void addDictEncodedString(const std::vector<std::string>& string_vec) {
256  for (const auto& str : string_vec) {
257  if (str.size() > StringDictionary::MAX_STRLEN) {
258  throw std::runtime_error("String too long for dictionary encoding.");
259  }
260  }
261  switch (column_desc_->columnType.get_size()) {
262  case 1:
263  string_dict_i8_buffer_->resize(string_vec.size());
264  string_dict_->getOrAddBulk(string_vec, string_dict_i8_buffer_->data());
265  break;
266  case 2:
267  string_dict_i16_buffer_->resize(string_vec.size());
268  string_dict_->getOrAddBulk(string_vec, string_dict_i16_buffer_->data());
269  break;
270  case 4:
271  string_dict_i32_buffer_->resize(string_vec.size());
272  string_dict_->getOrAddBulk(string_vec, string_dict_i32_buffer_->data());
273  break;
274  default:
275  CHECK(false);
276  }
277  }
278 
280  const std::vector<std::vector<std::string>>& string_array_vec) {
282 
283  // first check data is ok
284  for (auto& p : string_array_vec) {
285  for (const auto& str : p) {
286  if (str.size() > StringDictionary::MAX_STRLEN) {
287  throw std::runtime_error("String too long for dictionary encoding.");
288  }
289  }
290  }
291 
292  std::vector<std::vector<int32_t>> ids_array(0);
293  string_dict_->getOrAddBulkArray(string_array_vec, ids_array);
294 
295  for (auto& p : ids_array) {
296  size_t len = p.size() * sizeof(int32_t);
297  auto a = static_cast<int32_t*>(checked_malloc(len));
298  memcpy(a, &p[0], len);
299  // TODO: distinguish between empty and NULL
300  string_array_dict_buffer_->push_back(
301  ArrayDatum(len, reinterpret_cast<int8_t*>(a), len == 0));
302  }
303  }
304 
305  const SQLTypeInfo& getTypeInfo() const { return column_desc_->columnType; }
306 
307  const ColumnDescriptor* getColumnDesc() const { return column_desc_; }
308 
310 
311  int8_t* getAsBytes() const {
312  switch (column_desc_->columnType.get_type()) {
313  case kBOOLEAN:
314  return reinterpret_cast<int8_t*>(&((*bool_buffer_)[0]));
315  case kTINYINT:
316  return reinterpret_cast<int8_t*>(&((*tinyint_buffer_)[0]));
317  case kSMALLINT:
318  return reinterpret_cast<int8_t*>(&((*smallint_buffer_)[0]));
319  case kINT:
320  return reinterpret_cast<int8_t*>(&((*int_buffer_)[0]));
321  case kBIGINT:
322  case kNUMERIC:
323  case kDECIMAL:
324  return reinterpret_cast<int8_t*>(&((*bigint_buffer_)[0]));
325  case kFLOAT:
326  return reinterpret_cast<int8_t*>(&((*float_buffer_)[0]));
327  case kDOUBLE:
328  return reinterpret_cast<int8_t*>(&((*double_buffer_)[0]));
329  case kDATE:
330  case kTIME:
331  case kTIMESTAMP:
332  return reinterpret_cast<int8_t*>(&((*bigint_buffer_)[0]));
333  default:
334  abort();
335  }
336  }
337 
338  size_t getElementSize() const {
339  switch (column_desc_->columnType.get_type()) {
340  case kBOOLEAN:
341  return sizeof((*bool_buffer_)[0]);
342  case kTINYINT:
343  return sizeof((*tinyint_buffer_)[0]);
344  case kSMALLINT:
345  return sizeof((*smallint_buffer_)[0]);
346  case kINT:
347  return sizeof((*int_buffer_)[0]);
348  case kBIGINT:
349  case kNUMERIC:
350  case kDECIMAL:
351  return sizeof((*bigint_buffer_)[0]);
352  case kFLOAT:
353  return sizeof((*float_buffer_)[0]);
354  case kDOUBLE:
355  return sizeof((*double_buffer_)[0]);
356  case kDATE:
357  case kTIME:
358  case kTIMESTAMP:
359  return sizeof((*bigint_buffer_)[0]);
360  default:
361  abort();
362  }
363  }
364 
365  std::vector<std::string>* getStringBuffer() const { return string_buffer_; }
366 
367  std::vector<std::string>* getGeoStringBuffer() const { return geo_string_buffer_; }
368 
369  std::vector<ArrayDatum>* getArrayBuffer() const { return array_buffer_; }
370 
371  std::vector<std::vector<std::string>>* getStringArrayBuffer() const {
372  return string_array_buffer_;
373  }
374 
375  std::vector<ArrayDatum>* getStringArrayDictBuffer() const {
377  }
378 
379  int8_t* getStringDictBuffer() const {
380  switch (column_desc_->columnType.get_size()) {
381  case 1:
382  return reinterpret_cast<int8_t*>(&((*string_dict_i8_buffer_)[0]));
383  case 2:
384  return reinterpret_cast<int8_t*>(&((*string_dict_i16_buffer_)[0]));
385  case 4:
386  return reinterpret_cast<int8_t*>(&((*string_dict_i32_buffer_)[0]));
387  default:
388  abort();
389  }
390  }
391 
393  if (string_dict_ == nullptr) {
394  return true;
395  }
396  return string_dict_->checkpoint();
397  }
398 
399  void clear() {
400  switch (column_desc_->columnType.get_type()) {
401  case kBOOLEAN: {
402  bool_buffer_->clear();
403  break;
404  }
405  case kTINYINT: {
406  tinyint_buffer_->clear();
407  break;
408  }
409  case kSMALLINT: {
410  smallint_buffer_->clear();
411  break;
412  }
413  case kINT: {
414  int_buffer_->clear();
415  break;
416  }
417  case kBIGINT:
418  case kNUMERIC:
419  case kDECIMAL: {
420  bigint_buffer_->clear();
421  break;
422  }
423  case kFLOAT: {
424  float_buffer_->clear();
425  break;
426  }
427  case kDOUBLE: {
428  double_buffer_->clear();
429  break;
430  }
431  case kTEXT:
432  case kVARCHAR:
433  case kCHAR: {
434  string_buffer_->clear();
436  switch (column_desc_->columnType.get_size()) {
437  case 1:
438  string_dict_i8_buffer_->clear();
439  break;
440  case 2:
441  string_dict_i16_buffer_->clear();
442  break;
443  case 4:
444  string_dict_i32_buffer_->clear();
445  break;
446  default:
447  CHECK(false);
448  }
449  }
450  break;
451  }
452  case kDATE:
453  case kTIME:
454  case kTIMESTAMP:
455  bigint_buffer_->clear();
456  break;
457  case kARRAY: {
459  string_array_buffer_->clear();
460  string_array_dict_buffer_->clear();
461  } else {
462  array_buffer_->clear();
463  }
464  break;
465  }
466  case kPOINT:
467  case kLINESTRING:
468  case kPOLYGON:
469  case kMULTIPOLYGON:
470  geo_string_buffer_->clear();
471  break;
472  default:
473  CHECK(false);
474  }
475  }
476 
477  size_t add_values(const ColumnDescriptor* cd, const TColumn& data);
478 
479  size_t add_arrow_values(const ColumnDescriptor* cd,
480  const arrow::Array& data,
481  const bool exact_type_match,
482  const ArraySliceRange& slice_range,
483  BadRowsTracker* bad_rows_tracker);
484 
485  void add_value(const ColumnDescriptor* cd,
486  const std::string& val,
487  const bool is_null,
488  const CopyParams& copy_params,
489  const int64_t replicate_count = 0);
490  void add_value(const ColumnDescriptor* cd,
491  const TDatum& val,
492  const bool is_null,
493  const int64_t replicate_count = 0);
494  void pop_value();
495 
496  int64_t get_replicate_count() const { return replicate_count_; }
497  void set_replicate_count(const int64_t replicate_count) {
498  replicate_count_ = replicate_count;
499  }
500  template <typename DATA_TYPE>
502  const arrow::Array& array,
503  std::vector<DATA_TYPE>& buffer,
504  const ArraySliceRange& slice_range,
505  BadRowsTracker* const bad_rows_tracker);
506  template <typename DATA_TYPE>
507  auto del_values(std::vector<DATA_TYPE>& buffer, BadRowsTracker* const bad_rows_tracker);
508  auto del_values(const SQLTypes type, BadRowsTracker* const bad_rows_tracker);
509  std::vector<std::unique_ptr<TypedImportBuffer>>* import_buffers;
510  size_t col_idx;
511 
512  private:
513  union {
514  std::vector<int8_t>* bool_buffer_;
515  std::vector<int8_t>* tinyint_buffer_;
516  std::vector<int16_t>* smallint_buffer_;
517  std::vector<int32_t>* int_buffer_;
518  std::vector<int64_t>* bigint_buffer_;
519  std::vector<float>* float_buffer_;
520  std::vector<double>* double_buffer_;
521  std::vector<std::string>* string_buffer_;
522  std::vector<std::string>* geo_string_buffer_;
523  std::vector<ArrayDatum>* array_buffer_;
524  std::vector<std::vector<std::string>>* string_array_buffer_;
525  };
526  union {
527  std::vector<uint8_t>* string_dict_i8_buffer_;
528  std::vector<uint16_t>* string_dict_i16_buffer_;
529  std::vector<int32_t>* string_dict_i32_buffer_;
530  std::vector<ArrayDatum>* string_array_dict_buffer_;
531  };
534  size_t replicate_count_ = 0;
535 };
536 
537 class Loader {
538  public:
540  : catalog_(c)
541  , table_desc_(t)
542  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true)) {
543  init();
544  }
545 
546  virtual ~Loader() {}
547 
549  const TableDescriptor* getTableDesc() const { return table_desc_; }
550  const std::list<const ColumnDescriptor*>& get_column_descs() const {
551  return column_descs_;
552  }
553 
555  if ((cd->columnType.get_type() != kARRAY ||
556  !IS_STRING(cd->columnType.get_subtype())) &&
557  (!cd->columnType.is_string() ||
559  return nullptr;
560  }
561  return dict_map_.at(cd->columnId);
562  }
563 
564  virtual bool load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
565  const size_t row_count);
566  virtual bool loadNoCheckpoint(
567  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
568  const size_t row_count);
569  virtual void checkpoint();
570  virtual int32_t getTableEpoch();
571  virtual void setTableEpoch(const int32_t new_epoch);
572 
573  void setReplicating(const bool replicating) { replicating_ = replicating; }
574  bool getReplicating() const { return replicating_; }
575 
576  protected:
577  void init();
578 
579  virtual bool loadImpl(
580  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
581  size_t row_count,
582  bool checkpoint);
583 
584  using OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>;
585  void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
586  std::vector<size_t>& all_shard_row_counts,
587  const OneShardBuffers& import_buffers,
588  const size_t row_count,
589  const size_t shard_count);
590 
593  std::list<const ColumnDescriptor*> column_descs_;
595  std::map<int, StringDictionary*> dict_map_;
596 
597  private:
598  std::vector<DataBlockPtr> get_data_block_pointers(
599  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
600  bool loadToShard(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
601  size_t row_count,
602  const TableDescriptor* shard_table,
603  bool checkpoint);
604 
605  bool replicating_ = false;
606  std::mutex loader_mutex_;
607 };
608 
609 struct ImportStatus {
610  std::chrono::steady_clock::time_point start;
611  std::chrono::steady_clock::time_point end;
615  std::chrono::duration<size_t, std::milli> elapsed;
617  int thread_id; // to recall thread_id after thread exit
619  : start(std::chrono::steady_clock::now())
620  , rows_completed(0)
621  , rows_estimated(0)
622  , rows_rejected(0)
623  , elapsed(0)
624  , load_truncated(0)
625  , thread_id(0) {}
626 
630 
631  return *this;
632  }
633 };
634 
636  public:
638  DataStreamSink(const CopyParams& copy_params, const std::string file_path)
639  : copy_params(copy_params), file_path(file_path) {}
640  virtual ~DataStreamSink() {}
641  virtual ImportStatus importDelimited(const std::string& file_path,
642  const bool decompressed) = 0;
643 #ifdef ENABLE_IMPORT_PARQUET
644  virtual void import_parquet(std::vector<std::string>& file_paths);
645  virtual void import_local_parquet(const std::string& file_path) = 0;
646 #endif
647  const CopyParams& get_copy_params() const { return copy_params; }
648  void import_compressed(std::vector<std::string>& file_paths);
649 
650  protected:
652 
654  const std::string file_path;
655  FILE* p_file = nullptr;
657  bool load_failed = false;
658  size_t total_file_size{0};
659  std::vector<size_t> file_offsets;
660  std::mutex file_offsets_mutex;
661 };
662 
663 class Detector : public DataStreamSink {
664  public:
665  Detector(const boost::filesystem::path& fp, CopyParams& cp)
666  : DataStreamSink(cp, fp.string()), file_path(fp) {
667  read_file();
668  init();
669  };
670 #ifdef ENABLE_IMPORT_PARQUET
671  void import_local_parquet(const std::string& file_path) override;
672 #endif
673  static SQLTypes detect_sqltype(const std::string& str);
674  std::vector<std::string> get_headers();
675  std::vector<std::vector<std::string>> raw_rows;
676  std::vector<std::vector<std::string>> get_sample_rows(size_t n);
677  std::vector<SQLTypes> best_sqltypes;
678  std::vector<EncodingType> best_encodings;
679  bool has_headers = false;
680 
681  private:
682  void init();
683  void read_file();
684  void detect_row_delimiter();
685  void split_raw_data();
686  std::vector<SQLTypes> detect_column_types(const std::vector<std::string>& row);
687  static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b);
688  void find_best_sqltypes();
689  std::vector<SQLTypes> find_best_sqltypes(
690  const std::vector<std::vector<std::string>>& raw_rows,
691  const CopyParams& copy_params);
692  std::vector<SQLTypes> find_best_sqltypes(
693  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
694  const std::vector<std::vector<std::string>>::const_iterator& row_end,
695  const CopyParams& copy_params);
696 
697  std::vector<EncodingType> find_best_encodings(
698  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
699  const std::vector<std::vector<std::string>>::const_iterator& row_end,
700  const std::vector<SQLTypes>& best_types);
701 
702  bool detect_headers(const std::vector<SQLTypes>& first_types,
703  const std::vector<SQLTypes>& rest_types);
705  ImportStatus importDelimited(const std::string& file_path,
706  const bool decompressed) override;
707  std::string raw_data;
708  boost::filesystem::path file_path;
709  std::chrono::duration<double> timeout{1};
710  std::string line1;
711 };
712 
714  public:
715  static ArrayDatum composeNullArray(const SQLTypeInfo& ti);
716 };
717 
719  public:
720  RenderGroupAnalyzer() : _rtree(std::make_unique<RTree>()), _numRenderGroups(0) {}
721  void seedFromExistingTableContents(const std::unique_ptr<Loader>& loader,
722  const std::string& geoColumnBaseName);
723  int insertBoundsAndReturnRenderGroup(const std::vector<double>& bounds);
724 
725  private:
726  using Point = boost::geometry::model::point<double, 2, boost::geometry::cs::cartesian>;
727  using BoundingBox = boost::geometry::model::box<Point>;
728  using Node = std::pair<BoundingBox, int>;
729  using RTree =
730  boost::geometry::index::rtree<Node, boost::geometry::index::quadratic<16>>;
731  std::unique_ptr<RTree> _rtree;
732  std::mutex _rtreeMutex;
734 };
735 
736 class Importer : public DataStreamSink {
737  public:
739  const TableDescriptor* t,
740  const std::string& f,
741  const CopyParams& p);
742  Importer(Loader* providedLoader, const std::string& f, const CopyParams& p);
743  ~Importer() override;
744  ImportStatus import();
745  ImportStatus importDelimited(const std::string& file_path,
746  const bool decompressed) override;
747  ImportStatus importGDAL(std::map<std::string, std::string> colname_to_src);
748  static bool hasGDALLibKML();
749  const CopyParams& get_copy_params() const { return copy_params; }
750  const std::list<const ColumnDescriptor*>& get_column_descs() const {
751  return loader->get_column_descs();
752  }
753  void load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
754  size_t row_count);
755  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>>& get_import_buffers_vec() {
756  return import_buffers_vec;
757  }
758  std::vector<std::unique_ptr<TypedImportBuffer>>& get_import_buffers(int i) {
759  return import_buffers_vec[i];
760  }
761  const bool* get_is_array() const { return is_array_a.get(); }
762 #ifdef ENABLE_IMPORT_PARQUET
763  void import_local_parquet(const std::string& file_path) override;
764 #endif
765  static ImportStatus get_import_status(const std::string& id);
766  static void set_import_status(const std::string& id, const ImportStatus is);
767  static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
768  const std::string& fileName,
769  const std::string& geoColumnName,
770  const CopyParams& copy_params);
771  static void readMetadataSampleGDAL(
772  const std::string& fileName,
773  const std::string& geoColumnName,
774  std::map<std::string, std::vector<std::string>>& metadata,
775  int rowLimit,
776  const CopyParams& copy_params);
777  static bool gdalFileExists(const std::string& path, const CopyParams& copy_params);
778  static bool gdalFileOrDirectoryExists(const std::string& path,
779  const CopyParams& copy_params);
780  static std::vector<std::string> gdalGetAllFilesInArchive(
781  const std::string& archive_path,
782  const CopyParams& copy_params);
785  GeoFileLayerInfo(const std::string& name_, GeoFileLayerContents contents_)
786  : name(name_), contents(contents_) {}
787  std::string name;
789  };
790  static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
791  const std::string& file_name,
792  const CopyParams& copy_params);
793  static bool gdalSupportsNetworkFileAccess();
794  Catalog_Namespace::Catalog& getCatalog() { return loader->getCatalog(); }
795  static void set_geo_physical_import_buffer(
796  const Catalog_Namespace::Catalog& catalog,
797  const ColumnDescriptor* cd,
798  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
799  size_t& col_idx,
800  std::vector<double>& coords,
801  std::vector<double>& bounds,
802  std::vector<int>& ring_sizes,
803  std::vector<int>& poly_rings,
804  int render_group,
805  const int64_t replicate_count = 0);
807  const Catalog_Namespace::Catalog& catalog,
808  const ColumnDescriptor* cd,
809  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
810  size_t& col_idx,
811  std::vector<std::vector<double>>& coords_column,
812  std::vector<std::vector<double>>& bounds_column,
813  std::vector<std::vector<int>>& ring_sizes_column,
814  std::vector<std::vector<int>>& poly_rings_column,
815  int render_group,
816  const int64_t replicate_count = 0);
817  void checkpoint(const int32_t start_epoch);
818  auto getLoader() const { return loader.get(); }
819 
820  private:
821  static void initGDAL();
822  static bool gdalStatInternal(const std::string& path,
823  const CopyParams& copy_params,
824  bool also_dir);
825  static OGRDataSource* openGDALDataset(const std::string& fileName,
826  const CopyParams& copy_params);
828  std::string import_id;
829  size_t file_size;
830  size_t max_threads;
831  char* buffer[2];
832  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>> import_buffers_vec;
833  std::unique_ptr<Loader> loader;
834  std::unique_ptr<bool[]> is_array_a;
835  static std::mutex init_gdal_mutex;
836 };
837 
839  public:
840  ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
843 
844  void importGeoTable(const std::string& file_path,
845  const std::string& table_name,
846  const bool compression,
847  const bool create_table,
848  const bool explode_collections);
849 };
850 
851 } // namespace Importer_NS
852 
853 #endif // _IMPORTER_H_
std::unique_ptr< Loader > loader
Definition: Importer.h:833
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4559
void addSmallint(const int16_t v)
Definition: Importer.h:229
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4087
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2433
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:554
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:208
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:516
std::vector< std::vector< std::string > > * getStringArrayBuffer() const
Definition: Importer.h:371
std::chrono::duration< double > timeout
Definition: Importer.h:709
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3039
size_t getElementSize() const
Definition: Importer.h:338
std::vector< size_t > file_offsets
Definition: Importer.h:659
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3542
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
Definition: Importer.cpp:5039
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:584
void addGeoString(const std::string &v)
Definition: Importer.h:241
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:518
std::vector< EncodingType > best_encodings
Definition: Importer.h:678
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2735
SQLTypes
Definition: sqltypes.h:41
void addTinyint(const int8_t v)
Definition: Importer.h:227
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:842
virtual void checkpoint()
Definition: Importer.cpp:4000
ExecutorDeviceType
std::vector< std::string > * getGeoStringBuffer() const
Definition: Importer.h:367
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2856
std::vector< float > * float_buffer_
Definition: Importer.h:519
std::mutex loader_mutex_
Definition: Importer.h:606
StringDictionary * getStringDictionary() const
Definition: Importer.h:309
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:4988
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
Definition: Importer.h:87
const CopyParams & get_copy_params() const
Definition: Importer.h:749
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4628
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:594
std::chrono::steady_clock::time_point end
Definition: Importer.h:611
~Importer() override
Definition: Importer.cpp:196
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:550
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
Definition: Importer.h:375
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
Definition: Importer.h:755
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:431
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:595
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2379
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:834
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
int8_t * getAsBytes() const
Definition: Importer.h:311
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:726
Detector(const boost::filesystem::path &fp, CopyParams &cp)
Definition: Importer.h:665
void addInt(const int32_t v)
Definition: Importer.h:231
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:305
void addDictEncodedString(const std::vector< std::string > &string_vec)
Definition: Importer.h:254
std::vector< uint16_t > * string_dict_i16_buffer_
Definition: Importer.h:528
std::chrono::steady_clock::time_point start
Definition: Importer.h:610
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2581
std::string line1
Definition: Importer.h:710
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4462
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4334
void set_replicate_count(const int64_t replicate_count)
Definition: Importer.h:497
boost::geometry::index::rtree< Node, boost::geometry::index::quadratic< 16 >> RTree
Definition: Importer.h:730
void addBoolean(const int8_t v)
Definition: Importer.h:225
void setReplicating(const bool replicating)
Definition: Importer.h:573
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed)=0
virtual void setTableEpoch(const int32_t new_epoch)
Definition: Importer.cpp:4012
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:615
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:515
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:794
virtual ~Loader()
Definition: Importer.h:546
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3090
void addFloat(const float v)
Definition: Importer.h:235
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
Definition: Importer.cpp:4864
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
CHECK(cgen_state)
std::vector< std::string > * string_buffer_
Definition: Importer.h:521
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:727
StringDictionary * string_dict_
Definition: Importer.h:533
static bool hasGDALLibKML()
Definition: Importer.cpp:4082
std::vector< int32_t > * string_dict_i32_buffer_
Definition: Importer.h:529
std::vector< std::vector< std::string > > * string_array_buffer_
Definition: Importer.h:524
static bool gdalSupportsNetworkFileAccess()
Definition: Importer.cpp:4620
int64_t get_replicate_count() const
Definition: Importer.h:496
std::vector< int32_t > * int_buffer_
Definition: Importer.h:517
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
std::string import_id
Definition: Importer.h:828
const CopyParams & get_copy_params() const
Definition: Importer.h:647
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
ImportStatus archivePlumber()
Definition: Importer.cpp:3146
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t)
Definition: Importer.h:539
auto getLoader() const
Definition: Importer.h:818
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
std::pair< BoundingBox, int > Node
Definition: Importer.h:728
std::string raw_data
Definition: Importer.h:707
ImportStatus & operator+=(const ImportStatus &is)
Definition: Importer.h:627
std::atomic< int > nerrors
Definition: Importer.h:79
specifies the content in-memory of a row in the column metadata table
bool g_enable_smem_group_by true
void addBigint(const int64_t v)
Definition: Importer.h:233
std::vector< std::string > get_headers()
Definition: Importer.cpp:3098
void add_value(const ColumnDescriptor *cd, const std::string &val, const bool is_null, const CopyParams &copy_params, const int64_t replicate_count=0)
Definition: Importer.cpp:506
boost::filesystem::path file_path
Definition: Importer.h:708
std::vector< int8_t > * bool_buffer_
Definition: Importer.h:514
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:593
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1541
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1457
virtual int32_t getTableEpoch()
Definition: Importer.cpp:4007
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4157
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Definition: Importer.h:509
void getOrAddBulkArray(const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
std::mutex file_offsets_mutex
Definition: Importer.h:660
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:2933
bool checkpoint() noexcept
const bool * get_is_array() const
Definition: Importer.h:761
Definition: sqltypes.h:55
Definition: sqltypes.h:56
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4457
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:2941
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:750
const TableDescriptor * table_desc_
Definition: Importer.h:592
bool is_null(const T &v, const SQLTypeInfo &t)
void addStringArray(const std::vector< std::string > &arr)
Definition: Importer.h:250
std::vector< std::string > * geo_string_buffer_
Definition: Importer.h:522
void addArray(const ArrayDatum &v)
Definition: Importer.h:243
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:2963
DataStreamSink(const CopyParams &copy_params, const std::string file_path)
Definition: Importer.h:638
std::vector< std::vector< std::string > > raw_rows
Definition: Importer.h:675
std::vector< ArrayDatum > * string_array_dict_buffer_
Definition: Importer.h:530
std::vector< double > * double_buffer_
Definition: Importer.h:520
bool getReplicating() const
Definition: Importer.h:574
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3117
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4424
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4213
ImportStatus import_status
Definition: Importer.h:656
Definition: sqltypes.h:44
const ColumnDescriptor * column_desc_
Definition: Importer.h:532
const TableDescriptor * getTableDesc() const
Definition: Importer.h:549
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
#define IS_STRING(T)
Definition: sqltypes.h:166
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3075
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2609
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3110
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
void addString(const std::string &v)
Definition: Importer.h:239
std::vector< std::string > * getStringBuffer() const
Definition: Importer.h:365
bool is_string() const
Definition: sqltypes.h:477
bool g_enable_watchdog false
Definition: Execute.cpp:71
void addDictEncodedStringArray(const std::vector< std::vector< std::string >> &string_array_vec)
Definition: Importer.h:279
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:149
static void initGDAL()
Definition: Importer.cpp:4040
ImportDriver(std::shared_ptr< Catalog_Namespace::Catalog > cat, const Catalog_Namespace::UserMetadata &user, const ExecutorDeviceType dt=ExecutorDeviceType::GPU)
Definition: Importer.cpp:5034
std::vector< ArrayDatum > * getArrayBuffer() const
Definition: Importer.h:369
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:758
static constexpr size_t MAX_STRLEN
Definition: sqltypes.h:48
SQLTypeInfo columnType
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:527
std::set< int64_t > rows
Definition: Importer.h:78
static std::mutex init_gdal_mutex
Definition: Importer.h:835
specifies the content in-memory of a row in the table metadata table
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:942
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
const ColumnDescriptor * getColumnDesc() const
Definition: Importer.h:307
std::vector< std::string > & addStringArray()
Definition: Importer.h:245
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:74
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
Definition: Importer.h:785
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2373
std::unique_ptr< RTree > _rtree
Definition: Importer.h:731
const std::string file_path
Definition: Importer.h:654
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:122
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2670
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:523
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:3792
int8_t * getStringDictBuffer() const
Definition: Importer.h:379
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:677
void addDouble(const double v)
Definition: Importer.h:237
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:4534