26 #include <ogrsf_frmts.h> 29 #include <boost/filesystem.hpp> 30 #include <boost/noncopyable.hpp> 31 #include <boost/tokenizer.hpp> 32 #include <condition_variable> 42 #include <string_view> 57 #include <boost/geometry/index/rtree.hpp> 86 : column_desc_(col_desc), string_dict_(string_dict) {
89 bool_buffer_ =
new std::vector<int8_t>();
92 tinyint_buffer_ =
new std::vector<int8_t>();
95 smallint_buffer_ =
new std::vector<int16_t>();
98 int_buffer_ =
new std::vector<int32_t>();
103 bigint_buffer_ =
new std::vector<int64_t>();
106 float_buffer_ =
new std::vector<float>();
109 double_buffer_ =
new std::vector<double>();
114 string_buffer_ =
new std::vector<std::string>();
118 string_dict_i8_buffer_ =
new std::vector<uint8_t>();
121 string_dict_i16_buffer_ =
new std::vector<uint16_t>();
124 string_dict_i32_buffer_ =
new std::vector<int32_t>();
134 bigint_buffer_ =
new std::vector<int64_t>();
139 string_array_buffer_ =
new std::vector<std::vector<std::string>>();
140 string_array_dict_buffer_ =
new std::vector<ArrayDatum>();
142 array_buffer_ =
new std::vector<ArrayDatum>();
149 geo_string_buffer_ =
new std::vector<std::string>();
157 switch (column_desc_->columnType.get_type()) {
162 delete tinyint_buffer_;
165 delete smallint_buffer_;
173 delete bigint_buffer_;
176 delete float_buffer_;
179 delete double_buffer_;
184 delete string_buffer_;
185 if (column_desc_->columnType.get_compression() ==
kENCODING_DICT) {
186 switch (column_desc_->columnType.get_size()) {
188 delete string_dict_i8_buffer_;
191 delete string_dict_i16_buffer_;
194 delete string_dict_i32_buffer_;
202 delete bigint_buffer_;
205 if (
IS_STRING(column_desc_->columnType.get_subtype())) {
206 delete string_array_buffer_;
207 delete string_array_dict_buffer_;
209 delete array_buffer_;
216 delete geo_string_buffer_;
223 void addBoolean(
const int8_t v) { bool_buffer_->push_back(v); }
225 void addTinyint(
const int8_t v) { tinyint_buffer_->push_back(v); }
227 void addSmallint(
const int16_t v) { smallint_buffer_->push_back(v); }
229 void addInt(
const int32_t v) { int_buffer_->push_back(v); }
231 void addBigint(
const int64_t v) { bigint_buffer_->push_back(v); }
233 void addFloat(
const float v) { float_buffer_->push_back(v); }
235 void addDouble(
const double v) { double_buffer_->push_back(v); }
237 void addString(
const std::string_view v) { string_buffer_->emplace_back(v); }
239 void addGeoString(
const std::string_view v) { geo_string_buffer_->emplace_back(v); }
244 string_array_buffer_->emplace_back();
245 return string_array_buffer_->back();
249 string_array_buffer_->push_back(arr);
252 void addDictEncodedString(
const std::vector<std::string>& string_vec);
255 const std::vector<std::vector<std::string>>& string_array_vec) {
259 for (
auto& p : string_array_vec) {
260 for (
const auto& str : p) {
262 throw std::runtime_error(
"String too long for dictionary encoding.");
267 std::vector<std::vector<int32_t>> ids_array(0);
268 string_dict_->getOrAddBulkArray(string_array_vec, ids_array);
270 for (
auto& p : ids_array) {
271 size_t len = p.size() *
sizeof(int32_t);
273 memcpy(a, &p[0], len);
275 string_array_dict_buffer_->push_back(
276 ArrayDatum(len, reinterpret_cast<int8_t*>(a), len == 0));
287 switch (column_desc_->columnType.get_type()) {
289 return reinterpret_cast<int8_t*
>(&((*bool_buffer_)[0]));
291 return reinterpret_cast<int8_t*
>(&((*tinyint_buffer_)[0]));
293 return reinterpret_cast<int8_t*
>(&((*smallint_buffer_)[0]));
295 return reinterpret_cast<int8_t*
>(&((*int_buffer_)[0]));
299 return reinterpret_cast<int8_t*
>(&((*bigint_buffer_)[0]));
301 return reinterpret_cast<int8_t*
>(&((*float_buffer_)[0]));
303 return reinterpret_cast<int8_t*
>(&((*double_buffer_)[0]));
307 return reinterpret_cast<int8_t*
>(&((*bigint_buffer_)[0]));
314 switch (column_desc_->columnType.get_type()) {
316 return sizeof((*bool_buffer_)[0]);
318 return sizeof((*tinyint_buffer_)[0]);
320 return sizeof((*smallint_buffer_)[0]);
322 return sizeof((*int_buffer_)[0]);
326 return sizeof((*bigint_buffer_)[0]);
328 return sizeof((*float_buffer_)[0]);
330 return sizeof((*double_buffer_)[0]);
334 return sizeof((*bigint_buffer_)[0]);
347 return string_array_buffer_;
351 return string_array_dict_buffer_;
355 switch (column_desc_->columnType.get_size()) {
357 return reinterpret_cast<int8_t*
>(&((*string_dict_i8_buffer_)[0]));
359 return reinterpret_cast<int8_t*
>(&((*string_dict_i16_buffer_)[0]));
361 return reinterpret_cast<int8_t*
>(&((*string_dict_i32_buffer_)[0]));
368 if (string_dict_ ==
nullptr) {
371 return string_dict_->checkpoint();
375 switch (column_desc_->columnType.get_type()) {
377 bool_buffer_->clear();
381 tinyint_buffer_->clear();
385 smallint_buffer_->clear();
389 int_buffer_->clear();
395 bigint_buffer_->clear();
399 float_buffer_->clear();
403 double_buffer_->clear();
409 string_buffer_->clear();
410 if (column_desc_->columnType.get_compression() ==
kENCODING_DICT) {
411 switch (column_desc_->columnType.get_size()) {
413 string_dict_i8_buffer_->clear();
416 string_dict_i16_buffer_->clear();
419 string_dict_i32_buffer_->clear();
430 bigint_buffer_->clear();
433 if (
IS_STRING(column_desc_->columnType.get_subtype())) {
434 string_array_buffer_->clear();
435 string_array_dict_buffer_->clear();
437 array_buffer_->clear();
445 geo_string_buffer_->clear();
455 const arrow::Array& data,
456 const bool exact_type_match,
461 const std::string_view val,
464 const int64_t replicate_count = 0);
469 const int64_t replicate_count = 0);
475 replicate_count_ = replicate_count;
477 template <
typename DATA_TYPE>
479 const arrow::Array& array,
480 std::vector<DATA_TYPE>& buffer,
483 template <
typename DATA_TYPE>
484 auto del_values(std::vector<DATA_TYPE>& buffer,
BadRowsTracker*
const bad_rows_tracker);
511 size_t replicate_count_ = 0;
516 std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
517 std::vector<DataBlockPtr>&,
526 bool use_catalog_locks =
true)
531 ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
532 : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
533 , load_callback_(load_callback) {
534 init(use_catalog_locks);
542 return column_descs_;
555 virtual bool load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
556 const size_t row_count);
557 virtual bool loadNoCheckpoint(
558 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
559 const size_t row_count);
560 virtual void checkpoint();
561 virtual std::vector<Catalog_Namespace::TableEpochInfo> getTableEpochs()
const;
562 virtual void setTableEpochs(
563 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
567 void dropColumns(
const std::vector<int>& columns);
570 void init(
const bool use_catalog_locks);
572 virtual bool loadImpl(
573 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
578 void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
579 std::vector<size_t>& all_shard_row_counts,
581 const size_t row_count,
582 const size_t shard_count);
592 std::vector<DataBlockPtr> get_data_block_pointers(
593 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
594 bool loadToShard(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
599 bool replicating_ =
false;
604 std::chrono::steady_clock::time_point
start;
605 std::chrono::steady_clock::time_point
end;
609 std::chrono::duration<size_t, std::milli>
elapsed;
613 : start(
std::chrono::steady_clock::now())
633 : copy_params(copy_params), file_path(file_path) {}
635 virtual ImportStatus importDelimited(
const std::string& file_path,
636 const bool decompressed) = 0;
637 #ifdef ENABLE_IMPORT_PARQUET 638 virtual void import_parquet(std::vector<std::string>& file_paths);
639 virtual void import_local_parquet(
const std::string& file_path) = 0;
642 void import_compressed(std::vector<std::string>& file_paths);
649 FILE* p_file =
nullptr;
651 bool load_failed =
false;
652 size_t total_file_size{0};
664 #ifdef ENABLE_IMPORT_PARQUET 665 void import_local_parquet(
const std::string& file_path)
override;
667 static SQLTypes detect_sqltype(
const std::string& str);
668 std::vector<std::string> get_headers();
670 std::vector<std::vector<std::string>> get_sample_rows(
size_t n);
673 bool has_headers =
false;
678 void detect_row_delimiter();
679 void split_raw_data();
680 std::vector<SQLTypes> detect_column_types(
const std::vector<std::string>& row);
682 void find_best_sqltypes();
683 std::vector<SQLTypes> find_best_sqltypes(
684 const std::vector<std::vector<std::string>>& raw_rows,
686 std::vector<SQLTypes> find_best_sqltypes(
687 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
688 const std::vector<std::vector<std::string>>::const_iterator& row_end,
691 std::vector<EncodingType> find_best_encodings(
692 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
693 const std::vector<std::vector<std::string>>::const_iterator& row_end,
694 const std::vector<SQLTypes>& best_types);
696 bool detect_headers(
const std::vector<SQLTypes>& first_types,
697 const std::vector<SQLTypes>& rest_types);
698 void find_best_sqltypes_and_headers();
699 ImportStatus importDelimited(
const std::string& file_path,
700 const bool decompressed)
override;
703 std::chrono::duration<double> timeout{1};
715 void seedFromExistingTableContents(
const std::unique_ptr<Loader>& loader,
716 const std::string& geoColumnBaseName);
717 int insertBoundsAndReturnRenderGroup(
const std::vector<double>& bounds);
720 using Point = boost::geometry::model::point<double, 2, boost::geometry::cs::cartesian>;
722 using Node = std::pair<BoundingBox, int>;
724 boost::geometry::index::rtree<Node, boost::geometry::index::quadratic<16>>;
734 const std::string& f,
739 ImportStatus importDelimited(
const std::string& file_path,
740 const bool decompressed)
override;
741 ImportStatus importGDAL(std::map<std::string, std::string> colname_to_src);
744 return loader->get_column_descs();
746 void load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
749 return import_buffers_vec;
752 return import_buffers_vec[i];
755 #ifdef ENABLE_IMPORT_PARQUET 756 void import_local_parquet(
const std::string& file_path)
override;
758 static ImportStatus get_import_status(
const std::string&
id);
759 static void set_import_status(
const std::string&
id,
const ImportStatus is);
760 static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
761 const std::string& fileName,
762 const std::string& geoColumnName,
764 static void readMetadataSampleGDAL(
765 const std::string& fileName,
766 const std::string& geoColumnName,
767 std::map<std::string, std::vector<std::string>>& metadata,
770 static bool gdalFileExists(
const std::string& path,
const CopyParams& copy_params);
771 static bool gdalFileOrDirectoryExists(
const std::string& path,
773 static std::vector<std::string> gdalGetAllFilesInArchive(
774 const std::string& archive_path,
779 :
name(name_), contents(contents_) {}
783 static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
784 const std::string& file_name,
787 static void set_geo_physical_import_buffer(
790 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
792 std::vector<double>& coords,
793 std::vector<double>& bounds,
794 std::vector<int>& ring_sizes,
795 std::vector<int>& poly_rings,
797 const int64_t replicate_count = 0);
798 static void set_geo_physical_import_buffer_columnar(
801 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
803 std::vector<std::vector<double>>& coords_column,
804 std::vector<std::vector<double>>& bounds_column,
805 std::vector<std::vector<int>>& ring_sizes_column,
806 std::vector<std::vector<int>>& poly_rings_column,
808 const int64_t replicate_count = 0);
809 void checkpoint(
const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
813 static bool gdalStatInternal(
const std::string& path,
816 static OGRDataSource* openGDALDataset(
const std::string& fileName,
818 static void setGDALAuthorizationTokens(
const CopyParams& copy_params);
834 #endif // _IMPORTER_H_ Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
std::pair< size_t, size_t > ArraySliceRange
StringDictionary * getStringDictionary() const
void addBigint(const int64_t v)
void addSmallint(const int16_t v)
class for a per-database catalog. also includes metadata for the current database and the current use...
const std::list< const ColumnDescriptor * > & get_column_descs() const
void addDictEncodedStringArray(const std::vector< std::vector< std::string >> &string_array_vec)
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
virtual ~DataStreamSink()
std::vector< std::string > * string_buffer_
void addString(const std::string_view v)
std::vector< ArrayDatum > * array_buffer_
const ColumnDescriptor * getColumnDesc() const
StringDictionary * string_dict_
std::atomic< int > nerrors
void set_replicate_count(const int64_t replicate_count)
void addDouble(const double v)
void addStringArray(const std::vector< std::string > &arr)
std::vector< int16_t > * smallint_buffer_
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
size_t getElementSize() const
const TableDescriptor * table_desc_
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
ImportStatus import_status
HOST DEVICE int get_size() const
const std::list< const ColumnDescriptor * > & get_column_descs() const
bool getReplicating() const
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer > > &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
std::vector< SQLTypes > best_sqltypes
std::chrono::duration< size_t, std::milli > elapsed
std::vector< std::string > * getGeoStringBuffer() const
int8_t * getAsBytes() const
std::vector< float > * float_buffer_
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
HOST DEVICE EncodingType get_compression() const
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
std::pair< BoundingBox, int > Node
std::vector< std::vector< std::string > > * getStringArrayBuffer() const
std::vector< double > * double_buffer_
void setReplicating(const bool replicating)
void addFloat(const float v)
Fragmenter_Namespace::InsertData insert_data_
DataStreamSink(const CopyParams ©_params, const std::string file_path)
ImportStatus & operator+=(const ImportStatus &is)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
GeoFileLayerContents contents
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
std::vector< ArrayDatum > * getArrayBuffer() const
std::vector< int32_t > * int_buffer_
std::vector< ArrayDatum > * string_array_dict_buffer_
CONSTEXPR DEVICE bool is_null(const T &value)
DEVICE Array(const int64_t size, const bool is_null=false)
int64_t get_replicate_count() const
void init(LogOptions const &log_opts)
void addBoolean(const int8_t v)
void * checked_malloc(const size_t size)
std::vector< uint8_t > * string_dict_i8_buffer_
void addTinyint(const int8_t v)
std::vector< int64_t > * bigint_buffer_
HOST DEVICE SQLTypes get_subtype() const
bool stringDictCheckpoint()
void addInt(const int32_t v)
const TableDescriptor * getTableDesc() const
Catalog_Namespace::Catalog & getCatalog()
std::vector< std::vector< std::string > > raw_rows
specifies the content in-memory of a row in the column metadata table
int8_t * getStringDictBuffer() const
std::vector< EncodingType > best_encodings
std::vector< int8_t > * bool_buffer_
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
std::mutex file_offsets_mutex
std::vector< std::vector< std::string > > * string_array_buffer_
boost::filesystem::path file_path
std::unique_ptr< bool[]> is_array_a
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Detector(const boost::filesystem::path &fp, CopyParams &cp)
std::vector< std::string > & addStringArray()
std::vector< int32_t > * string_dict_i32_buffer_
std::list< const ColumnDescriptor * > column_descs_
Catalog_Namespace::Catalog & getCatalog() const
void addArray(const ArrayDatum &v)
const CopyParams & get_copy_params() const
const ColumnDescriptor * column_desc_
boost::geometry::model::box< Point > BoundingBox
std::unique_ptr< RTree > _rtree
std::chrono::steady_clock::time_point start
std::vector< uint16_t > * string_dict_i16_buffer_
Catalog_Namespace::Catalog & catalog_
const CopyParams & get_copy_params() const
std::vector< int8_t > * tinyint_buffer_
HOST DEVICE SQLTypes get_type() const
std::vector< std::string > * getStringBuffer() const
The data to be inserted using the fragment manager.
boost::geometry::index::rtree< Node, boost::geometry::index::quadratic< 16 > > RTree
static constexpr size_t MAX_STRLEN
LoadCallbackType load_callback_
const SQLTypeInfo & getTypeInfo() const
std::vector< size_t > file_offsets
std::map< int, StringDictionary * > dict_map_
std::vector< std::string > * geo_string_buffer_
std::chrono::steady_clock::time_point end
std::vector< std::unique_ptr< TypedImportBuffer > > OneShardBuffers
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
std::unique_ptr< Loader > loader
const bool * get_is_array() const
const std::string file_path