26 #include <ogrsf_frmts.h>
29 #include <boost/filesystem.hpp>
30 #include <boost/noncopyable.hpp>
31 #include <boost/tokenizer.hpp>
32 #include <condition_variable>
42 #include <string_view>
57 #include <boost/geometry/index/rtree.hpp>
68 namespace import_export {
255 const std::vector<std::vector<std::string>>& string_array_vec) {
259 for (
auto& p : string_array_vec) {
260 for (
const auto& str : p) {
262 throw std::runtime_error(
"String too long for dictionary encoding.");
267 std::vector<std::vector<int32_t>> ids_array(0);
270 for (
auto& p : ids_array) {
271 size_t len = p.size() *
sizeof(int32_t);
273 memcpy(a, &p[0], len);
276 ArrayDatum(len, reinterpret_cast<int8_t*>(a), len == 0));
295 return reinterpret_cast<int8_t*
>(
int_buffer_->data());
316 return sizeof((*bool_buffer_)[0]);
318 return sizeof((*tinyint_buffer_)[0]);
320 return sizeof((*smallint_buffer_)[0]);
322 return sizeof((*int_buffer_)[0]);
326 return sizeof((*bigint_buffer_)[0]);
328 return sizeof((*float_buffer_)[0]);
330 return sizeof((*double_buffer_)[0]);
334 return sizeof((*bigint_buffer_)[0]);
455 const arrow::Array& data,
456 const bool exact_type_match,
461 const std::string_view val,
469 template <
typename DATA_TYPE>
471 const arrow::Array& array,
472 std::vector<DATA_TYPE>& buffer,
475 template <
typename DATA_TYPE>
480 const std::vector<std::unique_ptr<TypedImportBuffer>>&
import_buffers);
511 std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
512 std::vector<DataBlockPtr>&,
521 bool use_catalog_locks =
true)
527 : c.getAllColumnMetadataForTableUnlocked(t->tableId,
false,
false,
true))
529 init(use_catalog_locks);
550 virtual bool load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
551 const size_t row_count,
554 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
555 const size_t row_count,
558 virtual std::vector<Catalog_Namespace::TableEpochInfo>
getTableEpochs()
const;
560 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
568 void init(
const bool use_catalog_locks);
571 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
578 std::vector<size_t>& all_shard_row_counts,
580 const size_t row_count,
581 const size_t shard_count,
592 bool loadToShard(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
598 std::vector<OneShardBuffers>& all_shard_import_buffers,
599 std::vector<size_t>& all_shard_row_counts,
601 const size_t row_count,
602 const size_t shard_count,
605 std::vector<OneShardBuffers>& all_shard_import_buffers,
606 std::vector<size_t>& all_shard_row_counts,
608 const size_t row_count,
609 const size_t shard_count,
621 std::chrono::steady_clock::time_point
start;
622 std::chrono::steady_clock::time_point
end;
626 std::chrono::duration<size_t, std::milli>
elapsed;
631 :
start(std::chrono::steady_clock::now())
654 : copy_params(copy_params), file_path(file_path) {}
658 const bool decompressed,
660 #ifdef ENABLE_IMPORT_PARQUET
661 virtual void import_parquet(std::vector<std::string>& file_paths,
663 virtual void import_local_parquet(
691 #ifdef ENABLE_IMPORT_PARQUET
692 void import_local_parquet(
const std::string&
file_path,
712 const std::vector<std::vector<std::string>>&
raw_rows,
715 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
716 const std::vector<std::vector<std::string>>::const_iterator& row_end,
720 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
721 const std::vector<std::vector<std::string>>::const_iterator& row_end,
722 const std::vector<SQLTypes>& best_types);
725 const std::vector<SQLTypes>& rest_types);
729 const bool decompressed,
746 const std::string& geoColumnBaseName);
750 using Point = boost::geometry::model::point<double, 2, boost::geometry::cs::cartesian>;
752 using Node = std::pair<BoundingBox, int>;
754 boost::geometry::index::rtree<Node, boost::geometry::index::quadratic<16>>;
764 const std::string&
f,
771 const bool decompressed,
777 return loader->get_column_descs();
779 void load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
789 #ifdef ENABLE_IMPORT_PARQUET
790 void import_local_parquet(
const std::string& file_path,
796 const std::string& fileName,
797 const std::string& geoColumnName,
800 const std::string& fileName,
801 const std::string& geoColumnName,
802 std::map<std::string, std::vector<std::string>>& metadata,
809 const std::string& archive_path,
819 const std::string& file_name,
825 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
827 std::vector<double>& coords,
828 std::vector<double>& bounds,
829 std::vector<int>& ring_sizes,
830 std::vector<int>& poly_rings,
835 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
837 std::vector<std::vector<double>>& coords_column,
838 std::vector<std::vector<double>>& bounds_column,
839 std::vector<std::vector<int>>& ring_sizes_column,
840 std::vector<std::vector<int>>& poly_rings_column,
841 std::vector<int>& render_groups_column);
842 void checkpoint(
const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
873 #endif // _IMPORTER_H_
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
std::pair< size_t, size_t > ArraySliceRange
const std::list< const ColumnDescriptor * > & get_column_descs() const
HOST DEVICE SQLTypes get_subtype() const
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
const SQLTypeInfo & getTypeInfo() const
StringDictionary * getStringDictionary() const
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
HOST DEVICE int get_size() const
void addBigint(const int64_t v)
void addSmallint(const int16_t v)
class for a per-database catalog. also includes metadata for the current database and the current use...
void addDictEncodedStringArray(const std::vector< std::vector< std::string >> &string_array_vec)
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
void detect_row_delimiter()
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams ©_params)
virtual ~DataStreamSink()
const TableDescriptor * getTableDesc() const
void dropColumns(const std::vector< int > &columns)
std::vector< std::string > * string_buffer_
void addString(const std::string_view v)
std::vector< ArrayDatum > * array_buffer_
void find_best_sqltypes_and_headers()
StringDictionary * string_dict_
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
std::atomic< int > nerrors
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
void addDouble(const double v)
void addStringArray(const std::vector< std::string > &arr)
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
std::vector< int16_t > * smallint_buffer_
const bool * get_is_array() const
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
const TableDescriptor * table_desc_
virtual void checkpoint()
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
std::vector< SQLTypes > best_sqltypes
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams ©_params)
std::chrono::duration< size_t, std::milli > elapsed
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
const CopyParams & get_copy_params() const
std::vector< float > * float_buffer_
HOST DEVICE SQLTypes get_type() const
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
static bool gdalStatInternal(const std::string &path, const CopyParams ©_params, bool also_dir)
ImportStatus import_status_
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
static bool gdalFileExists(const std::string &path, const CopyParams ©_params)
std::pair< BoundingBox, int > Node
std::vector< double > * double_buffer_
void addFloat(const float v)
Fragmenter_Namespace::InsertData insert_data_
std::vector< std::string > * getStringBuffer() const
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
mapd_shared_mutex import_mutex_
DataStreamSink(const CopyParams ©_params, const std::string file_path)
ImportStatus & operator+=(const ImportStatus &is)
void init(const bool use_catalog_locks)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
GeoFileLayerContents contents
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void setAddingColumns(const bool adding_columns)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
std::vector< int32_t > * int_buffer_
static void setGDALAuthorizationTokens(const CopyParams ©_params)
std::vector< ArrayDatum > * string_array_dict_buffer_
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
CONSTEXPR DEVICE bool is_null(const T &value)
DEVICE Array(const int64_t size, const bool is_null=false)
void addBoolean(const int8_t v)
void * checked_malloc(const size_t size)
std::vector< uint8_t > * string_dict_i8_buffer_
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams ©_params)
void addTinyint(const int8_t v)
std::shared_timed_mutex mapd_shared_mutex
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams ©_params)
std::vector< int64_t > * bigint_buffer_
int8_t * getAsBytes() const
bool stringDictCheckpoint()
void addInt(const int32_t v)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Catalog_Namespace::Catalog & getCatalog()
void find_best_sqltypes()
std::vector< std::vector< std::string > > raw_rows
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
bool isAddingColumns() const
std::vector< EncodingType > best_encodings
std::vector< int8_t > * bool_buffer_
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
std::mutex file_offsets_mutex
bool g_enable_smem_group_by true
std::vector< std::vector< std::string > > * string_array_buffer_
boost::filesystem::path file_path
size_t getElementSize() const
int8_t * getStringDictBuffer() const
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams ©_params)
std::unique_ptr< bool[]> is_array_a
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
bool checkpoint() noexcept
static void set_import_status(const std::string &id, const ImportStatus is)
Detector(const boost::filesystem::path &fp, CopyParams &cp)
std::vector< std::string > & addStringArray()
boost::geometry::index::rtree< Node, boost::geometry::index::quadratic< 16 >> RTree
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
HOST DEVICE EncodingType get_compression() const
std::vector< int32_t > * string_dict_i32_buffer_
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
std::list< const ColumnDescriptor * > column_descs_
void addArray(const ArrayDatum &v)
std::vector< std::vector< std::string > > * getStringArrayBuffer() const
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Catalog_Namespace::Catalog & getCatalog() const
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
std::string getErrorMessage()
std::chrono::duration< double > timeout
std::vector< std::string > * getGeoStringBuffer() const
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
const ColumnDescriptor * column_desc_
boost::geometry::model::box< Point > BoundingBox
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
std::unique_ptr< RTree > _rtree
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
std::chrono::steady_clock::time_point start
std::vector< uint16_t > * string_dict_i16_buffer_
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
std::vector< std::string > get_headers()
Catalog_Namespace::Catalog & catalog_
const CopyParams & get_copy_params() const
bool g_enable_watchdog false
std::vector< int8_t > * tinyint_buffer_
static ImportStatus get_import_status(const std::string &id)
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
const ColumnDescriptor * getColumnDesc() const
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams ©_params)
The data to be inserted using the fragment manager.
static constexpr size_t MAX_STRLEN
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams ©_params)
LoadCallbackType load_callback_
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
static std::mutex init_gdal_mutex
std::vector< size_t > file_offsets
std::map< int, StringDictionary * > dict_map_
std::vector< std::string > * geo_string_buffer_
std::chrono::steady_clock::time_point end
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
std::vector< ArrayDatum > * getArrayBuffer() const
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
std::unique_ptr< Loader > loader
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
void addDictEncodedString(const std::vector< std::string > &string_vec)
const std::list< const ColumnDescriptor * > & get_column_descs() const
const std::string file_path
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)