OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
 

Private Member Functions

ImportStatus importGDALGeo (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGDALRaster (const Catalog_Namespace::SessionInfo *session_info)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static
Geospatial::GDAL::DataSourceUqPtr 
openGDALDataSource (const std::string &fileName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsGeo (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsRaster (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 772 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 825 of file Importer.h.

825 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 177 of file Importer.cpp.

181  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:177
constexpr double f
Definition: Utm.h:31
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 183 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

184  : DataStreamSink(p, f), loader(providedLoader) {
185  import_id = boost::filesystem::path(file_path).filename().string();
186  file_size = 0;
187  max_threads = 0;
188  p_file = nullptr;
189  buffer[0] = nullptr;
190  buffer[1] = nullptr;
191  // we may be overallocating a little more memory here due to dropping phy cols.
192  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
193  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
194  int i = 0;
195  bool has_array = false;
196  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
197  int skip_physical_cols = 0;
198  for (auto& p : loader->get_column_descs()) {
199  // phy geo columns can't be in input file
200  if (skip_physical_cols-- > 0) {
201  continue;
202  }
203  // neither are rowid or $deleted$
204  // note: columns can be added after rowid/$deleted$
205  if (p->isVirtualCol || p->isDeletedCol) {
206  continue;
207  }
208  skip_physical_cols = p->columnType.get_physical_cols();
209  if (p->columnType.get_type() == kARRAY) {
210  is_array.get()[i] = true;
211  has_array = true;
212  } else {
213  is_array.get()[i] = false;
214  }
215  ++i;
216  }
217  if (has_array) {
218  is_array_a = std::unique_ptr<bool[]>(is_array.release());
219  } else {
220  is_array_a = std::unique_ptr<bool[]>(nullptr);
221  }
222 }
constexpr double f
Definition: Utm.h:31
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:887
std::string import_id
Definition: Importer.h:881
std::unique_ptr< Loader > loader
Definition: Importer.h:886
const std::string file_path
Definition: Importer.h:706
import_export::Importer::~Importer ( )
override

Definition at line 224 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

224  {
225  if (p_file != nullptr) {
226  fclose(p_file);
227  }
228  if (buffer[0] != nullptr) {
229  free(buffer[0]);
230  }
231  if (buffer[1] != nullptr) {
232  free(buffer[1]);
233  }
234 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3612 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), importGDALGeo(), and importGDALRaster().

3613  {
3614  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3617  // rollback to starting epoch - undo all the added records
3618  loader->setTableEpochs(table_epochs);
3619  } else {
3620  loader->checkpoint();
3621  }
3622  }
3623 
3624  if (loader->getTableDesc()->persistenceLevel ==
3625  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3626  // tables
3627  auto ms = measure<>::execution([&]() {
3629  if (!import_status_.load_failed) {
3630  for (auto& p : import_buffers_vec[0]) {
3631  if (!p->stringDictCheckpoint()) {
3632  LOG(ERROR) << "Checkpointing Dictionary for Column "
3633  << p->getColumnDesc()->columnName << " failed.";
3634  import_status_.load_failed = true;
3635  import_status_.load_msg = "Dictionary checkpoint failed";
3636  break;
3637  }
3638  }
3639  }
3640  });
3641  if (DEBUG_TIMING) {
3642  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3643  << std::endl;
3644  }
3645  }
3646 }
std::lock_guard< T > lock_guard
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:216
heavyai::unique_lock< heavyai::shared_mutex > write_lock
#define DEBUG_TIMING
Definition: Importer.cpp:166
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885
static constexpr char const * FOREIGN_TABLE
heavyai::shared_mutex import_mutex_
Definition: Importer.h:709
std::unique_ptr< Loader > loader
Definition: Importer.h:886

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5136 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5136  {
5137  return gdalStatInternal(path, copy_params, false);
5138 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5101

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5141 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5142  {
5143  return gdalStatInternal(path, copy_params, true);
5144 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5101

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5213 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5215  {
5216  // lazy init GDAL
5223 
5224  // prepare to gather files
5225  std::vector<std::string> files;
5226 
5227  // gather the files recursively
5228  gdalGatherFilesInArchiveRecursive(archive_path, files);
5229 
5230  // convert to relative paths inside archive
5231  for (auto& file : files) {
5232  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5233  }
5234 
5235  // done
5236  return files;
5237 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:5146
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5240 of file Importer.cpp.

References CHECK, EMPTY, GEO, import_export::CopyParams::geo_explode_collections, Geospatial::GDAL::init(), NON_GEO, openGDALDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5242  {
5243  // lazy init GDAL
5250 
5251  // prepare to gather layer info
5252  std::vector<GeoFileLayerInfo> layer_info;
5253 
5254  // open the data set
5256  if (poDS == nullptr) {
5257  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5258  file_name);
5259  }
5260 
5261  // enumerate the layers
5262  for (auto&& poLayer : poDS->GetLayers()) {
5264  // prepare to read this layer
5265  poLayer->ResetReading();
5266  // skip layer if empty
5267  if (poLayer->GetFeatureCount() > 0) {
5268  // get first feature
5269  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5270  CHECK(first_feature);
5271  // check feature for geometry
5272  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5273  if (!geometry) {
5274  // layer has no geometry
5275  contents = GeoFileLayerContents::NON_GEO;
5276  } else {
5277  // check the geometry type
5278  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5279  switch (wkbFlatten(geometry_type)) {
5280  case wkbPoint:
5281  case wkbLineString:
5282  case wkbPolygon:
5283  case wkbMultiPolygon:
5284  // layer has supported geo
5285  contents = GeoFileLayerContents::GEO;
5286  break;
5287  case wkbMultiPoint:
5288  case wkbMultiLineString:
5289  // supported if geo_explode_collections is specified
5293  break;
5294  default:
5295  // layer has unsupported geometry
5297  break;
5298  }
5299  }
5300  }
5301  // store info for this layer
5302  layer_info.emplace_back(poLayer->GetName(), contents);
5303  }
5304 
5305  // done
5306  return layer_info;
5307 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4676
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:222
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 5101 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

5103  {
5104  // lazy init GDAL
5111 
5112 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5113  // clear GDAL stat cache
5114  // without this, file existence will be cached, even if authentication changes
5115  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5116  VSICurlClearCache();
5117 #endif
5118 
5119  // stat path
5120  VSIStatBufL sb;
5121  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5122  if (result < 0) {
5123  return false;
5124  }
5125 
5126  // exists?
5127  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5128  return true;
5129  } else if (VSI_ISREG(sb.st_mode)) {
5130  return true;
5131  }
5132  return false;
5133 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const bool  is_raster,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4915 of file Importer.cpp.

References gdalToColumnDescriptorsGeo(), and gdalToColumnDescriptorsRaster().

Referenced by DBHandler::detect_column_types().

4919  {
4920  if (is_raster) {
4921  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4922  }
4923  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4924 }
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4999
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4927

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsGeo ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4999 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, import_export::anonymous_namespace{Importer.cpp}::ogr_to_type(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

5002  {
5003  std::list<ColumnDescriptor> cds;
5004 
5006  if (poDS == nullptr) {
5007  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5008  file_name);
5009  }
5010  if (poDS->GetLayerCount() == 0) {
5011  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5012  " has no layers");
5013  }
5014 
5015  OGRLayer& layer =
5017 
5018  layer.ResetReading();
5019  // TODO(andrewseidl): support multiple features
5020  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5021  if (poFeature == nullptr) {
5022  throw std::runtime_error("No features found in " + file_name);
5023  }
5024  // get fields as regular columns
5025  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5026  CHECK(poFDefn);
5027  int iField;
5028  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5029  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5030  auto typePair = ogr_to_type(poFieldDefn->GetType());
5031  ColumnDescriptor cd;
5032  cd.columnName = poFieldDefn->GetNameRef();
5033  cd.sourceName = poFieldDefn->GetNameRef();
5034  SQLTypeInfo ti;
5035  if (typePair.second) {
5036  ti.set_type(kARRAY);
5037  ti.set_subtype(typePair.first);
5038  } else {
5039  ti.set_type(typePair.first);
5040  }
5041  if (typePair.first == kTEXT) {
5043  ti.set_comp_param(32);
5044  }
5045  ti.set_fixed_size();
5046  cd.columnType = ti;
5047  cds.push_back(cd);
5048  }
5049  // get geo column, if any
5050  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
5051  if (poGeometry) {
5052  ColumnDescriptor cd;
5053  cd.columnName = geo_column_name;
5054  cd.sourceName = geo_column_name;
5055 
5056  // get GDAL type
5057  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
5058 
5059  // if exploding, override any collection type to child type
5061  if (ogr_type == wkbMultiPolygon) {
5062  ogr_type = wkbPolygon;
5063  } else if (ogr_type == wkbMultiLineString) {
5064  ogr_type = wkbLineString;
5065  } else if (ogr_type == wkbMultiPoint) {
5066  ogr_type = wkbPoint;
5067  }
5068  }
5069 
5070  // convert to internal type
5071  SQLTypes geoType = ogr_to_type(ogr_type);
5072 
5073  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5075  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5076  }
5077 
5078  // build full internal type
5079  SQLTypeInfo ti;
5080  ti.set_type(geoType);
5086  cd.columnType = ti;
5087 
5088  cds.push_back(cd);
5089  }
5090 
5091  // metadata columns?
5092  auto metadata_column_infos =
5094  for (auto& mci : metadata_column_infos) {
5095  cds.push_back(std::move(mci.column_descriptor));
5096  }
5097 
5098  return cds;
5099 }
void set_compression(EncodingType c)
Definition: sqltypes.h:440
SQLTypes
Definition: sqltypes.h:38
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:430
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4695
std::string sourceName
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:93
void set_input_srid(int d)
Definition: sqltypes.h:433
void set_fixed_size()
Definition: sqltypes.h:438
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4820
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:172
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:435
void set_comp_param(int p)
Definition: sqltypes.h:441
std::string geo_layer_name
Definition: CopyParams.h:80
Definition: sqltypes.h:52
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4676
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:222
SQLTypeInfo columnType
std::string columnName
EncodingType geo_coords_encoding
Definition: CopyParams.h:75
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:429

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsRaster ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4927 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getPointNamesAndSQLTypes(), Geospatial::GDAL::init(), kENCODING_GEOINT, kGEOMETRY, kPOINT, import_export::parse_add_metadata_columns(), import_export::CopyParams::raster_import_bands, import_export::CopyParams::raster_import_dimensions, import_export::CopyParams::raster_point_compute_angle, import_export::CopyParams::raster_point_transform, import_export::CopyParams::raster_point_type, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Geospatial::GDAL::setAuthorizationTokens(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4930  {
4931  // lazy init GDAL
4938 
4939  // prepare for metadata column
4940  auto metadata_column_infos =
4942 
4943  // create a raster importer and do the detect
4944  RasterImporter raster_importer;
4945  raster_importer.detect(
4946  file_name,
4952  false,
4953  metadata_column_infos);
4954 
4955  // prepare to capture column descriptors
4956  std::list<ColumnDescriptor> cds;
4957 
4958  // get the point column info
4959  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4960 
4961  // create the columns for the point in the specified type
4962  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4963  ColumnDescriptor cd;
4964  cd.columnName = cd.sourceName = col_name;
4965  cd.columnType.set_type(sql_type);
4966  // hardwire other POINT attributes for now
4967  if (sql_type == kPOINT) {
4969  cd.columnType.set_input_srid(4326);
4970  cd.columnType.set_output_srid(4326);
4972  cd.columnType.set_comp_param(32);
4973  }
4974  cds.push_back(cd);
4975  }
4976 
4977  // get the names and types for the band column(s)
4978  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4979 
4980  // add column descriptors for each band
4981  for (auto const& [band_name, sql_type] : band_names_and_types) {
4982  ColumnDescriptor cd;
4983  cd.columnName = cd.sourceName = band_name;
4984  cd.columnType.set_type(sql_type);
4986  cds.push_back(cd);
4987  }
4988 
4989  // metadata columns?
4990  for (auto& mci : metadata_column_infos) {
4991  cds.push_back(std::move(mci.column_descriptor));
4992  }
4993 
4994  // return the results
4995  return cds;
4996 }
void set_compression(EncodingType c)
Definition: sqltypes.h:440
std::string s3_secret_key
Definition: CopyParams.h:62
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4874
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:430
static void init()
Definition: GDAL.cpp:67
std::string raster_import_dimensions
Definition: CopyParams.h:92
std::string sourceName
std::string add_metadata_columns
Definition: CopyParams.h:93
void set_input_srid(int d)
Definition: sqltypes.h:433
RasterPointType raster_point_type
Definition: CopyParams.h:87
void set_fixed_size()
Definition: sqltypes.h:438
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:435
void set_comp_param(int p)
Definition: sqltypes.h:441
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4896
std::string s3_session_token
Definition: CopyParams.h:63
std::string raster_import_bands
Definition: CopyParams.h:88
SQLTypeInfo columnType
std::string s3_access_key
Definition: CopyParams.h:61
std::string columnName
RasterPointTransform raster_point_transform
Definition: CopyParams.h:90
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:429

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 789 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

789  {
790  return loader->get_column_descs();
791  }
std::unique_ptr< Loader > loader
Definition: Importer.h:886

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 788 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

788 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 798 of file Importer.h.

References import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

798  {
799  return import_buffers_vec[i];
800  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 795 of file Importer.h.

References import_buffers_vec.

795  {
796  return import_buffers_vec;
797  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 236 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

236  {
238  return import_status_map.at(import_id);
239 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:175
std::shared_lock< T > shared_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:174
std::string import_id
Definition: Importer.h:881

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 801 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

801 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:887

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 835 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

835 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:886

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 858 of file Importer.h.

References loader.

858 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:886
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4437 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4437  {
4438  return DataStreamSink::archivePlumber(session_info);
4439 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3648

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4441 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), heavyai::fopen(), g_enable_assign_render_groups, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, anonymous_namespace{Importer.cpp}::num_import_threads(), import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4444  {
4446  auto query_session = session_info ? session_info->get_session_id() : "";
4447 
4448  if (!p_file) {
4449  p_file = fopen(file_path.c_str(), "rb");
4450  }
4451  if (!p_file) {
4452  throw std::runtime_error("failed to open file '" + file_path +
4453  "': " + strerror(errno));
4454  }
4455 
4456  if (!decompressed) {
4457  (void)fseek(p_file, 0, SEEK_END);
4458  file_size = ftell(p_file);
4459  }
4460 
4462  VLOG(1) << "Delimited import # threads: " << max_threads;
4463 
4464  // deal with small files
4465  size_t alloc_size = copy_params.buffer_size;
4466  if (!decompressed && file_size < alloc_size) {
4467  alloc_size = file_size;
4468  }
4469 
4470  for (size_t i = 0; i < max_threads; i++) {
4471  import_buffers_vec.emplace_back();
4472  for (const auto cd : loader->get_column_descs()) {
4473  import_buffers_vec[i].emplace_back(
4474  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4475  }
4476  }
4477 
4478  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4479  size_t current_pos = 0;
4480  size_t end_pos;
4481  size_t begin_pos = 0;
4482 
4483  (void)fseek(p_file, current_pos, SEEK_SET);
4484  size_t size =
4485  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4486 
4487  // make render group analyzers for each poly column
4488  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4490  auto& cat = loader->getCatalog();
4491  auto* td = loader->getTableDesc();
4492  CHECK(td);
4493  auto column_descriptors =
4494  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4495  for (auto const& cd : column_descriptors) {
4496  if (IS_GEO_POLY(cd->columnType.get_type())) {
4497  auto rga = std::make_shared<RenderGroupAnalyzer>();
4498  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4499  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4500  }
4501  }
4502  }
4503 
4504  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4505  loader->getTableDesc()->tableId};
4506  auto table_epochs = loader->getTableEpochs();
4508  {
4509  std::list<std::future<ImportStatus>> threads;
4510 
4511  // use a stack to track thread_ids which must not overlap among threads
4512  // because thread_id is used to index import_buffers_vec[]
4513  std::stack<size_t> stack_thread_ids;
4514  for (size_t i = 0; i < max_threads; i++) {
4515  stack_thread_ids.push(i);
4516  }
4517  // added for true row index on error
4518  size_t first_row_index_this_buffer = 0;
4519 
4520  while (size > 0) {
4521  unsigned int num_rows_this_buffer = 0;
4522  CHECK(scratch_buffer);
4523  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4524  scratch_buffer,
4525  size,
4526  copy_params,
4527  first_row_index_this_buffer,
4528  num_rows_this_buffer,
4529  p_file);
4530 
4531  // unput residual
4532  int nresidual = size - end_pos;
4533  std::unique_ptr<char[]> unbuf;
4534  if (nresidual > 0) {
4535  unbuf = std::make_unique<char[]>(nresidual);
4536  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4537  }
4538 
4539  // get a thread_id not in use
4540  auto thread_id = stack_thread_ids.top();
4541  stack_thread_ids.pop();
4542  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4543 
4544  threads.push_back(std::async(std::launch::async,
4546  thread_id,
4547  this,
4548  std::move(scratch_buffer),
4549  begin_pos,
4550  end_pos,
4551  end_pos,
4552  columnIdToRenderGroupAnalyzerMap,
4553  first_row_index_this_buffer,
4554  session_info,
4555  executor));
4556 
4557  first_row_index_this_buffer += num_rows_this_buffer;
4558 
4559  current_pos += end_pos;
4560  scratch_buffer = std::make_unique<char[]>(alloc_size);
4561  CHECK(scratch_buffer);
4562  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4563  size = nresidual +
4564  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4565 
4566  begin_pos = 0;
4567  while (threads.size() > 0) {
4568  int nready = 0;
4569  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4570  it != threads.end();) {
4571  auto& p = *it;
4572  std::chrono::milliseconds span(0);
4573  if (p.wait_for(span) == std::future_status::ready) {
4574  auto ret_import_status = p.get();
4575  {
4577  import_status_ += ret_import_status;
4578  if (ret_import_status.load_failed) {
4580  }
4581  }
4582  // sum up current total file offsets
4583  size_t total_file_offset{0};
4584  if (decompressed) {
4585  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4586  for (const auto file_offset : file_offsets) {
4587  total_file_offset += file_offset;
4588  }
4589  }
4590  // estimate number of rows per current total file offset
4591  if (decompressed ? total_file_offset : current_pos) {
4593  (decompressed ? (float)total_file_size / total_file_offset
4594  : (float)file_size / current_pos) *
4595  import_status_.rows_completed;
4596  }
4597  VLOG(3) << "rows_completed " << import_status_.rows_completed
4598  << ", rows_estimated " << import_status_.rows_estimated
4599  << ", total_file_size " << total_file_size << ", total_file_offset "
4600  << total_file_offset;
4602  // recall thread_id for reuse
4603  stack_thread_ids.push(ret_import_status.thread_id);
4604  threads.erase(it++);
4605  ++nready;
4606  } else {
4607  ++it;
4608  }
4609  }
4610 
4611  if (nready == 0) {
4612  std::this_thread::yield();
4613  }
4614 
4615  // on eof, wait all threads to finish
4616  if (0 == size) {
4617  continue;
4618  }
4619 
4620  // keep reading if any free thread slot
4621  // this is one of the major difference from old threading model !!
4622  if (threads.size() < max_threads) {
4623  break;
4624  }
4627  break;
4628  }
4629  }
4632  import_status_.load_failed = true;
4633  // todo use better message
4634  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4635  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4636  break;
4637  }
4639  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4640  break;
4641  }
4642  }
4643 
4644  // join dangling threads in case of LOG(ERROR) above
4645  for (auto& p : threads) {
4646  p.wait();
4647  }
4648  }
4649 
4650  checkpoint(table_epochs);
4651 
4652  fclose(p_file);
4653  p_file = nullptr;
4654  return import_status_;
4655 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::string cat(Ts &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:2025
heavyai::shared_lock< heavyai::shared_mutex > read_lock
#define LOG(tag)
Definition: Logger.h:216
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const size_t num_import_threads(const int copy_params_threads)
Definition: Importer.cpp:134
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885
std::string get_session_id() const
Definition: SessionInfo.h:76
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:241
std::string import_id
Definition: Importer.h:881
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3612
ThreadId thread_id()
Definition: Logger.cpp:820
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:163
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
std::vector< size_t > file_offsets
Definition: Importer.h:711
#define VLOG(n)
Definition: Logger.h:316
heavyai::shared_mutex import_mutex_
Definition: Importer.h:709
std::unique_ptr< Loader > loader
Definition: Importer.h:886
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:706
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info,
const bool  is_raster 
)

Definition at line 5309 of file Importer.cpp.

References importGDALGeo(), and importGDALRaster().

Referenced by QueryRunner::ImportDriver::importGeoTable().

5312  {
5313  if (is_raster) {
5314  return importGDALRaster(session_info);
5315  }
5316  return importGDALGeo(columnNameToSourceNameMap, session_info);
5317 }
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5319
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5626

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALGeo ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 5319 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, g_enable_assign_render_groups, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, anonymous_namespace{Importer.cpp}::num_import_threads(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by importGDAL().

5321  {
5322  // initial status
5325  if (poDS == nullptr) {
5326  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5327  file_path);
5328  }
5329 
5330  OGRLayer& layer =
5332 
5333  // get the number of features in this layer
5334  size_t numFeatures = layer.GetFeatureCount();
5335 
5336  // build map of metadata field (additional columns) name to index
5337  // use shared_ptr since we need to pass it to the worker
5338  FieldNameToIndexMapType fieldNameToIndexMap;
5339  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5340  CHECK(poFDefn);
5341  size_t numFields = poFDefn->GetFieldCount();
5342  for (size_t iField = 0; iField < numFields; iField++) {
5343  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5344  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5345  }
5346 
5347  // the geographic spatial reference we want to put everything in
5348  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5349  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5350 
5351 #if GDAL_VERSION_MAJOR >= 3
5352  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5353  // this results in X and Y being transposed for angle-based
5354  // coordinate systems. This restores the previous behavior.
5355  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5356 #endif
5357 
5358 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5359  // just one "thread"
5360  max_threads = 1;
5361 #else
5362  // how many threads to use
5364 #endif
5365  VLOG(1) << "GDAL import # threads: " << max_threads;
5366 
5367  // metadata columns?
5368  auto const metadata_column_infos =
5370 
5371  // import geo table is specifically handled in both DBHandler and QueryRunner
5372  // that is separate path against a normal SQL execution
5373  // so we here explicitly enroll the import session to allow interruption
5374  // while importing geo table
5375  auto query_session = session_info ? session_info->get_session_id() : "";
5376  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5378  auto is_session_already_registered = false;
5379  {
5381  executor->getSessionLock());
5382  is_session_already_registered =
5383  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5384  }
5385  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5386  !is_session_already_registered) {
5387  executor->enrollQuerySession(query_session,
5388  "IMPORT_GEO_TABLE",
5389  query_submitted_time,
5391  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5392  }
5393  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5394  // reset the runtime query interrupt status
5395  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5396  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5397  }
5398  };
5399 
5400  // make an import buffer for each thread
5401  CHECK_EQ(import_buffers_vec.size(), 0u);
5403  for (size_t i = 0; i < max_threads; i++) {
5404  for (const auto cd : loader->get_column_descs()) {
5405  import_buffers_vec[i].emplace_back(
5406  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5407  }
5408  }
5409 
5410  // make render group analyzers for each poly column
5411  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5413  auto& cat = loader->getCatalog();
5414  auto* td = loader->getTableDesc();
5415  CHECK(td);
5416  auto column_descriptors =
5417  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5418  for (auto const& cd : column_descriptors) {
5419  if (IS_GEO_POLY(cd->columnType.get_type())) {
5420  auto rga = std::make_shared<RenderGroupAnalyzer>();
5421  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5422  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5423  }
5424  }
5425  }
5426 
5427 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5428  // threads
5429  std::list<std::future<ImportStatus>> threads;
5430 
5431  // use a stack to track thread_ids which must not overlap among threads
5432  // because thread_id is used to index import_buffers_vec[]
5433  std::stack<size_t> stack_thread_ids;
5434  for (size_t i = 0; i < max_threads; i++) {
5435  stack_thread_ids.push(i);
5436  }
5437 #endif
5438 
5439  // checkpoint the table
5440  auto table_epochs = loader->getTableEpochs();
5441 
5442  // reset the layer
5443  layer.ResetReading();
5444 
5445  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5446 
5447  // make a features buffer for each thread
5448  std::vector<FeaturePtrVector> features(max_threads);
5449 
5450  // make one of these for each thread, based on the first feature's SR
5451  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5452  max_threads);
5453 
5454  // for each feature...
5455  size_t firstFeatureThisChunk = 0;
5456  while (firstFeatureThisChunk < numFeatures) {
5457  // how many features this chunk
5458  size_t numFeaturesThisChunk =
5459  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5460 
5461 // get a thread_id not in use
5462 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5463  size_t thread_id = 0;
5464 #else
5465  auto thread_id = stack_thread_ids.top();
5466  stack_thread_ids.pop();
5467  CHECK(thread_id < max_threads);
5468 #endif
5469 
5470  // fill features buffer for new thread
5471  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5472  features[thread_id].emplace_back(layer.GetNextFeature());
5473  }
5474 
5475  // construct a coordinate transformation for each thread, if needed
5476  // some features may not have geometry, so look for the first one that does
5477  if (coordinate_transformations[thread_id] == nullptr) {
5478  for (auto const& feature : features[thread_id]) {
5479  auto const* geometry = feature->GetGeometryRef();
5480  if (geometry) {
5481  auto const* geometry_sr = geometry->getSpatialReference();
5482  // if the SR is non-null and non-empty and different from what we want
5483  // we need to make a reusable CoordinateTransformation
5484  if (geometry_sr &&
5485 #if GDAL_VERSION_MAJOR >= 3
5486  !geometry_sr->IsEmpty() &&
5487 #endif
5488  !geometry_sr->IsSame(poGeographicSR.get())) {
5489  // validate the SR before trying to use it
5490  if (geometry_sr->Validate() != OGRERR_NONE) {
5491  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5492  }
5493  // create the OGRCoordinateTransformation that will be used for
5494  // all the features in this chunk
5495  coordinate_transformations[thread_id].reset(
5496  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5497  if (coordinate_transformations[thread_id] == nullptr) {
5498  throw std::runtime_error(
5499  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5500  }
5501  }
5502  // once we find at least one geometry with an SR, we're done
5503  break;
5504  }
5505  }
5506  }
5507 
5508 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5509  // call worker function directly
5510  auto ret_import_status =
5512  this,
5513  coordinate_transformations[thread_id].get(),
5514  std::move(features[thread_id]),
5515  firstFeatureThisChunk,
5516  numFeaturesThisChunk,
5517  fieldNameToIndexMap,
5518  columnNameToSourceNameMap,
5519  columnIdToRenderGroupAnalyzerMap,
5520  session_info,
5521  executor.get(),
5522  metadata_column_infos);
5523  import_status += ret_import_status;
5524  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5525  import_status.rows_completed;
5526  set_import_status(import_id, import_status);
5527 #else
5528  // fire up that thread to import this geometry
5529  threads.push_back(std::async(std::launch::async,
5531  thread_id,
5532  this,
5533  coordinate_transformations[thread_id].get(),
5534  std::move(features[thread_id]),
5535  firstFeatureThisChunk,
5536  numFeaturesThisChunk,
5537  fieldNameToIndexMap,
5538  columnNameToSourceNameMap,
5539  columnIdToRenderGroupAnalyzerMap,
5540  session_info,
5541  executor.get(),
5542  metadata_column_infos));
5543 
5544  // let the threads run
5545  while (threads.size() > 0) {
5546  int nready = 0;
5547  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5548  it != threads.end();) {
5549  auto& p = *it;
5550  std::chrono::milliseconds span(
5551  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5552  if (p.wait_for(span) == std::future_status::ready) {
5553  auto ret_import_status = p.get();
5554  {
5556  import_status_ += ret_import_status;
5558  ((float)firstFeatureThisChunk / (float)numFeatures) *
5562  break;
5563  }
5564  }
5565  // recall thread_id for reuse
5566  stack_thread_ids.push(ret_import_status.thread_id);
5567 
5568  threads.erase(it++);
5569  ++nready;
5570  } else {
5571  ++it;
5572  }
5573  }
5574 
5575  if (nready == 0) {
5576  std::this_thread::yield();
5577  }
5578 
5579  // keep reading if any free thread slot
5580  // this is one of the major difference from old threading model !!
5581  if (threads.size() < max_threads) {
5582  break;
5583  }
5584  }
5585 #endif
5586 
5587  // out of rows?
5588 
5591  import_status_.load_failed = true;
5592  // todo use better message
5593  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5594  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5595  break;
5596  }
5598  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5599  "more details";
5600  break;
5601  }
5602 
5603  firstFeatureThisChunk += numFeaturesThisChunk;
5604  }
5605 
5606 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5607  // wait for any remaining threads
5608  if (threads.size()) {
5609  for (auto& p : threads) {
5610  // wait for the thread
5611  p.wait();
5612  // get the result and update the final import status
5613  auto ret_import_status = p.get();
5614  import_status_ += ret_import_status;
5617  }
5618  }
5619 #endif
5620 
5621  checkpoint(table_epochs);
5622 
5623  return import_status_;
5624 }
std::lock_guard< T > lock_guard
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:160
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::string cat(Ts &&...args)
#define LOG(tag)
Definition: Logger.h:216
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4695
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const size_t num_import_threads(const int copy_params_threads)
Definition: Importer.cpp:134
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
std::string add_metadata_columns
Definition: CopyParams.h:93
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1448
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
Definition: Importer.cpp:2403
std::string get_session_id() const
Definition: SessionInfo.h:76
std::string geo_layer_name
Definition: CopyParams.h:80
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:241
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4676
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
Definition: GDAL.h:59
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string import_id
Definition: Importer.h:881
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3612
ThreadId thread_id()
Definition: Logger.cpp:820
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:163
#define VLOG(n)
Definition: Logger.h:316
heavyai::shared_mutex import_mutex_
Definition: Importer.h:709
std::unique_ptr< Loader > loader
Definition: Importer.h:886
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:706
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALRaster ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 5626 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, anonymous_namespace{Importer.cpp}::check_session_interrupted(), checkpoint(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Geospatial::compress_coords(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), Executor::ERR_INTERRUPTED, anonymous_namespace{Utm.h}::f, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_output_srid(), Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getBandNullValue(), import_export::RasterImporter::getBandsHeight(), import_export::RasterImporter::getBandsWidth(), Executor::getExecutor(), import_export::RasterImporter::getNumBands(), import_export::RasterImporter::getPointNamesAndSQLTypes(), import_export::RasterImporter::getProjectedPixelCoords(), import_export::RasterImporter::getRawPixels(), import_export::RasterImporter::import(), import_buffers_vec, import_id, import_export::DataStreamSink::import_status_, logger::INFO, ColumnDescriptor::isGeoPhyCol, kARRAY, kBIGINT, kDOUBLE, kFLOAT, kINT, kMaxRasterScanlinesPerThread, kNULLT, kPOINT, kSMALLINT, kTINYINT, load(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, NULL_DOUBLE, NULL_FLOAT, NULL_INT, NULL_SMALLINT, anonymous_namespace{Importer.cpp}::num_import_threads(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, set_import_status(), logger::thread_id(), timer_start(), TIMER_STOP, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, and VLOG.

Referenced by importGDAL().

5627  {
5628  // initial status
5630 
5631  // metadata columns?
5632  auto const metadata_column_infos =
5634 
5635  // create a raster importer and do the detect
5636  RasterImporter raster_importer;
5637  raster_importer.detect(
5638  file_path,
5644  true,
5645  metadata_column_infos);
5646 
5647  // get the table columns and count actual columns
5648  auto const& column_descs = loader->get_column_descs();
5649  uint32_t num_table_cols{0u};
5650  for (auto const* cd : column_descs) {
5651  if (!cd->isGeoPhyCol) {
5652  num_table_cols++;
5653  }
5654  }
5655 
5656  // how many bands do we have?
5657  auto num_bands = raster_importer.getNumBands();
5658 
5659  // get point columns info
5660  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5661 
5662  // validate that the table column count matches
5663  auto num_expected_cols = num_bands;
5664  num_expected_cols += point_names_and_sql_types.size();
5665  num_expected_cols += metadata_column_infos.size();
5666  if (num_expected_cols != num_table_cols) {
5667  throw std::runtime_error(
5668  "Raster Import aborted. Band/Column count mismatch (file requires " +
5669  std::to_string(num_expected_cols) + ", table has " +
5670  std::to_string(num_table_cols) + ")");
5671  }
5672 
5673  // validate the point column names and types
5674  // if we're importing the coords as a POINT, then the first column
5675  // must be a POINT (two physical columns, POINT and TINYINT[])
5676  // if we're not, the first two columns must be the matching type
5677  // optionally followed by an angle column
5678  auto cd_itr = column_descs.begin();
5679  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5680  if (sql_type == kPOINT) {
5681  // POINT column
5682  {
5683  auto const* cd = *cd_itr++;
5684  if (cd->columnName != col_name) {
5685  throw std::runtime_error("Column '" + cd->columnName +
5686  "' overridden name invalid (must be '" + col_name +
5687  "')");
5688  }
5689  auto const cd_type = cd->columnType.get_type();
5690  if (cd_type != kPOINT) {
5691  throw std::runtime_error("Column '" + cd->columnName +
5692  "' overridden type invalid (must be POINT)");
5693  }
5694  if (cd->columnType.get_output_srid() != 4326) {
5695  throw std::runtime_error("Column '" + cd->columnName +
5696  "' overridden SRID invalid (must be 4326)");
5697  }
5698  }
5699  // TINYINT[] coords sub-column
5700  {
5701  // if the above is true, this must be true
5702  auto const* cd = *cd_itr++;
5703  CHECK(cd->columnType.get_type() == kARRAY);
5704  CHECK(cd->columnType.get_subtype() == kTINYINT);
5705  }
5706  } else {
5707  // column of the matching name and type
5708  auto const* cd = *cd_itr++;
5709  if (cd->columnName != col_name) {
5710  throw std::runtime_error("Column '" + cd->columnName +
5711  "' overridden name invalid (must be '" + col_name +
5712  "')");
5713  }
5714  auto const cd_type = cd->columnType.get_type();
5715  if (cd_type != sql_type) {
5716  throw std::runtime_error("Column '" + cd->columnName +
5717  "' overridden type invalid (must be " +
5718  to_string(sql_type) + ")");
5719  }
5720  }
5721  }
5722 
5723  // validate the band column types
5724  // any Immerse overriding to other types will currently be rejected
5725  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5726  if (band_names_and_types.size() != num_bands) {
5727  throw std::runtime_error("Column/Band count mismatch when validating types");
5728  }
5729  for (uint32_t i = 0; i < num_bands; i++) {
5730  auto const* cd = *cd_itr++;
5731  auto const cd_type = cd->columnType.get_type();
5732  auto const sql_type = band_names_and_types[i].second;
5733  if (cd_type != sql_type) {
5734  throw std::runtime_error("Band Column '" + cd->columnName +
5735  "' overridden type invalid (must be " +
5736  to_string(sql_type) + ")");
5737  }
5738  }
5739 
5740  // validate metadata column
5741  for (auto const& mci : metadata_column_infos) {
5742  auto const* cd = *cd_itr++;
5743  if (mci.column_descriptor.columnName != cd->columnName) {
5744  throw std::runtime_error("Metadata Column '" + cd->columnName +
5745  "' overridden name invalid (must be '" +
5746  mci.column_descriptor.columnName + "')");
5747  }
5748  auto const cd_type = cd->columnType.get_type();
5749  auto const md_type = mci.column_descriptor.columnType.get_type();
5750  if (cd_type != md_type) {
5751  throw std::runtime_error("Metadata Column '" + cd->columnName +
5752  "' overridden type invalid (must be " +
5753  to_string(md_type) + ")");
5754  }
5755  }
5756 
5757  // import geo table is specifically handled in both DBHandler and QueryRunner
5758  // that is separate path against a normal SQL execution
5759  // so we here explicitly enroll the import session to allow interruption
5760  // while importing geo table
5761  auto query_session = session_info ? session_info->get_session_id() : "";
5762  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5764  auto is_session_already_registered = false;
5765  {
5767  executor->getSessionLock());
5768  is_session_already_registered =
5769  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5770  }
5771  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5772  !is_session_already_registered) {
5773  executor->enrollQuerySession(query_session,
5774  "IMPORT_GEO_TABLE",
5775  query_submitted_time,
5777  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5778  }
5779  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5780  // reset the runtime query interrupt status
5781  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5782  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5783  }
5784  };
5785 
5786  // how many threads are we gonna use?
5788  VLOG(1) << "GDAL import # threads: " << max_threads;
5789 
5791  throw std::runtime_error("Invalid CopyParams.raster_scanlines_per_thread! (" +
5793  ")");
5794  }
5795  const int max_scanlines_per_thread =
5800  VLOG(1) << "Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5801 
5802  // make an import buffer for each thread
5803  CHECK_EQ(import_buffers_vec.size(), 0u);
5805  for (size_t i = 0; i < max_threads; i++) {
5806  for (auto const& cd : loader->get_column_descs()) {
5807  import_buffers_vec[i].emplace_back(
5808  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5809  }
5810  }
5811 
5812  // status and times
5813  using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5814 
5815  // get the band dimensions
5816  auto const band_size_x = raster_importer.getBandsWidth();
5817  auto const band_size_y = raster_importer.getBandsHeight();
5818 
5819  // allocate raw pixel buffers per thread
5820  std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5821  for (size_t i = 0; i < max_threads; i++) {
5822  raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5823  sizeof(double));
5824  }
5825 
5826  // just the sql type of the first point column (if any)
5827  auto const point_sql_type = point_names_and_sql_types.size()
5828  ? point_names_and_sql_types.begin()->second
5829  : kNULLT;
5830 
5831  // lambda for importing to raw data buffers (threadable)
5832  auto import_rows =
5833  [&](const size_t thread_idx, const int y_start, const int y_end) -> ThreadReturn {
5834  // this threads's import buffers
5835  auto& import_buffers = import_buffers_vec[thread_idx];
5836 
5837  // this thread's raw pixel bytes
5838  auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5839 
5840  // clear the buffers
5841  for (auto& col_buffer : import_buffers) {
5842  col_buffer->clear();
5843  }
5844 
5845  // prepare to iterate columns
5846  auto col_itr = column_descs.begin();
5847  int col_idx{0};
5848 
5849  float proj_s{0.0f};
5850  if (point_sql_type != kNULLT) {
5851  // the first two columns (either lon/lat or POINT/coords)
5852  auto const* cd_col0 = *col_itr++;
5853  auto const* cd_col1 = *col_itr++;
5854  auto const* cd_angle =
5855  copy_params.raster_point_compute_angle ? *col_itr++ : nullptr;
5856 
5857  // compute and add x and y
5858  auto proj_timer = timer_start();
5859  for (int y = y_start; y < y_end; y++) {
5860  // get projected pixel coords for this scan-line
5861  auto const coords = raster_importer.getProjectedPixelCoords(thread_idx, y);
5862 
5863  // add to buffers
5864  for (int x = 0; x < band_size_x; x++) {
5865  // this point and angle
5866  auto const& [dx, dy, angle] = coords[x];
5867 
5868  // store the point
5869  switch (point_sql_type) {
5870  case kPOINT: {
5871  // add empty value to POINT buffer
5872  TDatum td_point;
5873  import_buffers[0]->add_value(cd_col0, td_point, false);
5874 
5875  // convert lon/lat to bytes (compressed or not) and add to POINT coords
5876  // buffer
5877  auto const compressed_coords =
5878  Geospatial::compress_coords({dx, dy}, cd_col0->columnType);
5879  std::vector<TDatum> td_coords_data;
5880  for (auto const& cc : compressed_coords) {
5881  TDatum td_byte;
5882  td_byte.val.int_val = cc;
5883  td_coords_data.push_back(td_byte);
5884  }
5885  TDatum td_coords;
5886  td_coords.val.arr_val = td_coords_data;
5887  td_coords.is_null = false;
5888  import_buffers[1]->add_value(cd_col1, td_coords, false);
5889  } break;
5890  case kFLOAT:
5891  case kDOUBLE: {
5892  TDatum td;
5893  td.is_null = false;
5894  td.val.real_val = dx;
5895  import_buffers[0]->add_value(cd_col0, td, false);
5896  td.val.real_val = dy;
5897  import_buffers[1]->add_value(cd_col1, td, false);
5898  } break;
5899  case kSMALLINT:
5900  case kINT: {
5901  TDatum td;
5902  td.is_null = false;
5903  td.val.int_val = static_cast<int64_t>(x);
5904  import_buffers[0]->add_value(cd_col0, td, false);
5905  td.val.int_val = static_cast<int64_t>(y);
5906  import_buffers[1]->add_value(cd_col1, td, false);
5907  } break;
5908  default:
5909  CHECK(false);
5910  }
5911 
5912  // angle?
5914  CHECK(cd_angle);
5915  TDatum td;
5916  td.is_null = false;
5917  td.val.real_val = static_cast<double>(angle);
5918  import_buffers[2]->add_value(cd_angle, td, false);
5919  }
5920  }
5921  }
5922  proj_s = TIMER_STOP(proj_timer);
5923  col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5924  }
5925 
5926  // prepare to accumulate read and conv times
5927  float read_s{0.0f};
5928  float conv_s{0.0f};
5929 
5930  // y_end is one past the actual end, so don't add 1
5931  auto const num_rows = y_end - y_start;
5932  auto const num_elems = band_size_x * num_rows;
5933 
5934  // for each band/column
5935  for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5936  // the corresponding column
5937  auto const* cd_band = *col_itr;
5938  CHECK(cd_band);
5939 
5940  // data type to read as
5941  auto const cd_type = cd_band->columnType.get_type();
5942 
5943  // read the scanlines (will do a data type conversion if necessary)
5944  auto read_timer = timer_start();
5945  raster_importer.getRawPixels(
5946  thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5947  read_s += TIMER_STOP(read_timer);
5948 
5949  // null value?
5950  auto const [null_value, null_value_valid] =
5951  raster_importer.getBandNullValue(band_idx);
5952 
5953  // copy to this thread's import buffers
5954  // convert any nulls we find
5955  auto conv_timer = timer_start();
5956  TDatum td;
5957  switch (cd_type) {
5958  case kSMALLINT: {
5959  const int16_t* values =
5960  reinterpret_cast<const int16_t*>(raw_pixel_bytes.data());
5961  for (int idx = 0; idx < num_elems; idx++) {
5962  auto const& value = values[idx];
5963  if (null_value_valid && value == static_cast<int16_t>(null_value)) {
5964  td.is_null = true;
5965  td.val.int_val = NULL_SMALLINT;
5966  } else {
5967  td.is_null = false;
5968  td.val.int_val = static_cast<int64_t>(value);
5969  }
5970  import_buffers[col_idx]->add_value(cd_band, td, false);
5971  }
5972  } break;
5973  case kINT: {
5974  const int32_t* values =
5975  reinterpret_cast<const int32_t*>(raw_pixel_bytes.data());
5976  for (int idx = 0; idx < num_elems; idx++) {
5977  auto const& value = values[idx];
5978  if (null_value_valid && value == static_cast<int32_t>(null_value)) {
5979  td.is_null = true;
5980  td.val.int_val = NULL_INT;
5981  } else {
5982  td.is_null = false;
5983  td.val.int_val = static_cast<int64_t>(value);
5984  }
5985  import_buffers[col_idx]->add_value(cd_band, td, false);
5986  }
5987  } break;
5988  case kBIGINT: {
5989  const uint32_t* values =
5990  reinterpret_cast<const uint32_t*>(raw_pixel_bytes.data());
5991  for (int idx = 0; idx < num_elems; idx++) {
5992  auto const& value = values[idx];
5993  if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
5994  td.is_null = true;
5995  td.val.int_val = NULL_INT;
5996  } else {
5997  td.is_null = false;
5998  td.val.int_val = static_cast<int64_t>(value);
5999  }
6000  import_buffers[col_idx]->add_value(cd_band, td, false);
6001  }
6002  } break;
6003  case kFLOAT: {
6004  const float* values = reinterpret_cast<const float*>(raw_pixel_bytes.data());
6005  for (int idx = 0; idx < num_elems; idx++) {
6006  auto const& value = values[idx];
6007  if (null_value_valid && value == static_cast<float>(null_value)) {
6008  td.is_null = true;
6009  td.val.real_val = NULL_FLOAT;
6010  } else {
6011  td.is_null = false;
6012  td.val.real_val = static_cast<double>(value);
6013  }
6014  import_buffers[col_idx]->add_value(cd_band, td, false);
6015  }
6016  } break;
6017  case kDOUBLE: {
6018  const double* values = reinterpret_cast<const double*>(raw_pixel_bytes.data());
6019  for (int idx = 0; idx < num_elems; idx++) {
6020  auto const& value = values[idx];
6021  if (null_value_valid && value == null_value) {
6022  td.is_null = true;
6023  td.val.real_val = NULL_DOUBLE;
6024  } else {
6025  td.is_null = false;
6026  td.val.real_val = value;
6027  }
6028  import_buffers[col_idx]->add_value(cd_band, td, false);
6029  }
6030  } break;
6031  default:
6032  CHECK(false);
6033  }
6034  conv_s += TIMER_STOP(conv_timer);
6035 
6036  // next column
6037  col_idx++;
6038  col_itr++;
6039  }
6040 
6041  // metadata columns?
6042  for (auto const& mci : metadata_column_infos) {
6043  auto const* cd_band = *col_itr++;
6044  CHECK(cd_band);
6045  for (int i = 0; i < num_elems; i++) {
6046  import_buffers[col_idx]->add_value(cd_band, mci.value, false, copy_params);
6047  }
6048  col_idx++;
6049  }
6050 
6051  // build status
6052  ImportStatus thread_import_status;
6053  thread_import_status.rows_estimated = num_elems;
6054  thread_import_status.rows_completed = num_elems;
6055 
6056  // done
6057  return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
6058  };
6059 
6060  // prepare to checkpoint the table
6061  auto table_epochs = loader->getTableEpochs();
6062 
6063  // start wall clock
6064  auto wall_timer = timer_start();
6065 
6066  // start the import
6067  raster_importer.import(max_threads);
6068 
6069  // time the phases
6070  float total_proj_s{0.0f};
6071  float total_read_s{0.0f};
6072  float total_conv_s{0.0f};
6073  float total_load_s{0.0f};
6074 
6075  const int min_scanlines_per_thread = 8;
6076  const int max_scanlines_per_block = max_scanlines_per_thread * max_threads;
6077  for (int block_y = 0; block_y < band_size_y;
6078  block_y += (max_threads * max_scanlines_per_thread)) {
6079  using Future = std::future<ThreadReturn>;
6080  std::vector<Future> futures;
6081  const int scanlines_in_block =
6082  std::min(band_size_y - block_y, max_scanlines_per_block);
6083  const int pixels_in_block = scanlines_in_block * band_size_x;
6084  const int block_max_scanlines_per_thread =
6085  std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6086  static_cast<int>(max_threads),
6087  min_scanlines_per_thread);
6088  VLOG(1) << "Raster Importer: scanlines_in_block: " << scanlines_in_block
6089  << ", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6090 
6091  std::vector<size_t> rows_per_thread;
6092  auto block_wall_timer = timer_start();
6093  // run max_threads scanlines at once
6094  for (size_t thread_id = 0; thread_id < max_threads; thread_id++) {
6095  const int y_start = block_y + thread_id * block_max_scanlines_per_thread;
6096  if (y_start < band_size_y) {
6097  const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6098  if (y_start < y_end) {
6099  rows_per_thread.emplace_back((y_end - y_start) * band_size_x);
6100  futures.emplace_back(
6101  std::async(std::launch::async, import_rows, thread_id, y_start, y_end));
6102  }
6103  }
6104  }
6105 
6106  // wait for the threads to finish and
6107  // accumulate the results and times
6108  float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6109  size_t thread_idx = 0;
6110  size_t active_threads = 0;
6111  for (auto& future : futures) {
6112  auto const [import_status, times] = future.get();
6113  import_status_ += import_status;
6114  proj_s += times[0];
6115  read_s += times[1];
6116  conv_s += times[2];
6117  // We load the data in thread order so we can get deterministic row-major raster
6118  // ordering
6119  auto thread_load_timer = timer_start();
6120  // Todo: We should consider invoking the load on another thread in a ping-pong
6121  // fashion so we can simultaneously read the next batch of data
6122  load(import_buffers_vec[thread_idx], rows_per_thread[thread_idx], session_info);
6123  ++thread_idx;
6124  ++active_threads;
6125 
6126  load_s += TIMER_STOP(thread_load_timer);
6127  }
6128 
6129  // average times over all threads (except for load which is single-threaded)
6130  total_proj_s += (proj_s / float(active_threads));
6131  total_read_s += (read_s / float(active_threads));
6132  total_conv_s += (conv_s / float(active_threads));
6133  total_load_s += load_s;
6134 
6135  // update the status
6137 
6138  // more debug
6139  auto const block_wall_s = TIMER_STOP(block_wall_timer);
6140  auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6141  auto const rows_per_second = pixels_in_block / block_wall_s;
6142  LOG(INFO) << "Raster Importer: Loaded " << scanlines_in_block
6143  << " scanlines starting at " << block_y << " out of " << band_size_y
6144  << " in " << block_wall_s << "s at " << scanlines_per_second
6145  << " scanlines/s and " << rows_per_second << " rows/s";
6146 
6147  // check for interrupt
6148  if (UNLIKELY(check_session_interrupted(query_session, executor.get()))) {
6149  import_status_.load_failed = true;
6150  import_status_.load_msg = "Raster Import interrupted";
6152  }
6153  }
6154 
6155  // checkpoint
6156  auto checkpoint_timer = timer_start();
6157  checkpoint(table_epochs);
6158  auto const checkpoint_s = TIMER_STOP(checkpoint_timer);
6159 
6160  // stop wall clock
6161  auto const total_wall_s = TIMER_STOP(wall_timer);
6162 
6163  // report
6164  auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6165  auto const total_rows_per_second =
6166  float(band_size_x) * float(band_size_y) / total_wall_s;
6167  LOG(INFO) << "Raster Importer: Imported "
6168  << static_cast<uint64_t>(band_size_x) * static_cast<uint64_t>(band_size_y)
6169  << " rows";
6170  LOG(INFO) << "Raster Importer: Total Import Time " << total_wall_s << "s at "
6171  << total_scanlines_per_second << " scanlines/s and " << total_rows_per_second
6172  << " rows/s";
6173 
6174  // phase times (with proportions)
6175  auto proj_pct = float(int(total_proj_s / total_wall_s * 1000.0f) * 0.1f);
6176  auto read_pct = float(int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6177  auto conv_pct = float(int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6178  auto load_pct = float(int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6179  auto cpnt_pct = float(int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6180 
6181  VLOG(1) << "Raster Importer: Import timing breakdown:";
6182  VLOG(1) << " Project " << total_proj_s << "s (" << proj_pct << "%)";
6183  VLOG(1) << " Read " << total_read_s << "s (" << read_pct << "%)";
6184  VLOG(1) << " Convert " << total_conv_s << "s (" << conv_pct << "%)";
6185  VLOG(1) << " Load " << total_load_s << "s (" << load_pct << "%)";
6186  VLOG(1) << " Checkpoint " << checkpoint_s << "s (" << cpnt_pct << "%)";
6187 
6188  // all done
6189  return import_status_;
6190 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t raster_scanlines_per_thread
Definition: CopyParams.h:89
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: Importer.cpp:125
#define NULL_DOUBLE
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1346
#define NULL_FLOAT
#define LOG(tag)
Definition: Logger.h:216
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3602
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4874
const size_t num_import_threads(const int copy_params_threads)
Definition: Importer.cpp:134
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string raster_import_dimensions
Definition: CopyParams.h:92
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
#define NULL_INT
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
std::string add_metadata_columns
Definition: CopyParams.h:93
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
#define TIMER_STOP(t)
Definition: Importer.cpp:101
future< Result > async(Fn &&fn, Args &&...args)
RasterPointType raster_point_type
Definition: CopyParams.h:87
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:885
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1448
#define UNLIKELY(x)
Definition: likely.h:25
std::string get_session_id() const
Definition: SessionInfo.h:76
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:241
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4896
std::string import_id
Definition: Importer.h:881
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3612
ThreadId thread_id()
Definition: Logger.cpp:820
std::string raster_import_bands
Definition: CopyParams.h:88
#define CHECK(condition)
Definition: Logger.h:222
#define NULL_SMALLINT
Definition: sqltypes.h:45
RasterPointTransform raster_point_transform
Definition: CopyParams.h:90
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
static constexpr int kMaxRasterScanlinesPerThread
Definition: Importer.cpp:114
std::unique_ptr< Loader > loader
Definition: Importer.h:886
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:706

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3602 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), import_export::import_thread_shapefile(), and importGDALRaster().

3604  {
3605  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3607  import_status_.load_failed = true;
3608  import_status_.load_msg = loader->getErrorMessage();
3609  }
3610 }
std::lock_guard< T > lock_guard
heavyai::unique_lock< heavyai::shared_mutex > write_lock
heavyai::shared_mutex import_mutex_
Definition: Importer.h:709
std::unique_ptr< Loader > loader
Definition: Importer.h:886

+ Here is the caller graph for this function:

Geospatial::GDAL::DataSourceUqPtr import_export::Importer::openGDALDataSource ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4676 of file Importer.cpp.

References Geospatial::GDAL::init(), import_export::kGeoFile, Geospatial::GDAL::openDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), import_export::CopyParams::source_type, and to_string().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptorsGeo(), importGDALGeo(), and readMetadataSampleGDAL().

4678  {
4686  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4687  std::to_string(static_cast<int>(copy_params.source_type)) +
4688  ")");
4689  }
4691 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::string to_string(char const *&&v)
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string s3_session_token
Definition: CopyParams.h:63
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
Definition: GDAL.cpp:181
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4718 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), openGDALDataSource(), and import_export::parse_add_metadata_columns().

Referenced by DBHandler::detect_column_types().

4723  {
4725  openGDALDataSource(file_name, copy_params));
4726  if (datasource == nullptr) {
4727  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4728  file_name);
4729  }
4730 
4731  OGRLayer& layer =
4732  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4733 
4734  auto const* feature_defn = layer.GetLayerDefn();
4735  CHECK(feature_defn);
4736 
4737  // metadata columns?
4738  auto const metadata_column_infos =
4740 
4741  // get limited feature count
4742  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4743  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4744  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4745 
4746  // prepare sample data map
4747  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4748  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4749  CHECK(column_name);
4750  sample_data[column_name] = {};
4751  }
4752  sample_data[geo_column_name] = {};
4753  for (auto const& mci : metadata_column_infos) {
4754  sample_data[mci.column_descriptor.columnName] = {};
4755  }
4756 
4757  // prepare to read
4758  layer.ResetReading();
4759 
4760  // read features (up to limited count)
4761  uint64_t feature_index{0u};
4762  while (feature_index < num_features) {
4763  // get (and take ownership of) feature
4764  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4765  if (!feature) {
4766  break;
4767  }
4768 
4769  // get feature geometry
4770  auto const* geometry = feature->GetGeometryRef();
4771  if (geometry == nullptr) {
4772  break;
4773  }
4774 
4775  // validate geom type (again?)
4776  switch (wkbFlatten(geometry->getGeometryType())) {
4777  case wkbPoint:
4778  case wkbLineString:
4779  case wkbPolygon:
4780  case wkbMultiPolygon:
4781  break;
4782  case wkbMultiPoint:
4783  case wkbMultiLineString:
4784  // supported if geo_explode_collections is specified
4786  throw std::runtime_error("Unsupported geometry type: " +
4787  std::string(geometry->getGeometryName()));
4788  }
4789  break;
4790  default:
4791  throw std::runtime_error("Unsupported geometry type: " +
4792  std::string(geometry->getGeometryName()));
4793  }
4794 
4795  // populate sample data for regular field columns
4796  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4797  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4798  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4799  }
4800 
4801  // populate sample data for metadata columns?
4802  for (auto const& mci : metadata_column_infos) {
4803  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4804  }
4805 
4806  // populate sample data for geo column with WKT string
4807  char* wkts = nullptr;
4808  geometry->exportToWkt(&wkts);
4809  CHECK(wkts);
4810  sample_data[geo_column_name].push_back(wkts);
4811  CPLFree(wkts);
4812 
4813  // next feature
4814  feature_index++;
4815  }
4816 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4695
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:93
std::string geo_layer_name
Definition: CopyParams.h:80
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4676
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const bool  force_null = false 
)
static

Definition at line 1654 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), foreign_storage::TextFileBufferParser::processGeoColumn(), and foreign_storage::TextFileBufferParser::processInvalidGeoColumn().

1664  {
1665  const auto col_ti = cd->columnType;
1666  const auto col_type = col_ti.get_type();
1667  auto columnId = cd->columnId;
1668  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1669  bool is_null_geo = false;
1670  bool is_null_point = false;
1671  if (!col_ti.get_notnull()) {
1672  // Check for NULL geo
1673  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1674  is_null_point = true;
1675  coords.clear();
1676  }
1677  is_null_geo = coords.empty();
1678  if (is_null_point) {
1679  coords.push_back(NULL_ARRAY_DOUBLE);
1680  coords.push_back(NULL_DOUBLE);
1681  // Treating POINT coords as notnull, need to store actual encoding
1682  // [un]compressed+[not]null
1683  is_null_geo = false;
1684  }
1685  }
1686  if (force_null) {
1687  is_null_geo = true;
1688  }
1689  TDatum tdd_coords;
1690  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1691  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1692  // in a fixlen array, compressed and uncompressed.
1693  if (!is_null_geo) {
1694  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1695  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1696  for (auto cc : compressed_coords) {
1697  tdd_coords.val.arr_val.emplace_back();
1698  tdd_coords.val.arr_val.back().val.int_val = cc;
1699  }
1700  }
1701  tdd_coords.is_null = is_null_geo;
1702  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1703 
1704  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1705  // Create ring_sizes array value and add it to the physical column
1706  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1707  TDatum tdd_ring_sizes;
1708  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1709  if (!is_null_geo) {
1710  for (auto ring_size : ring_sizes) {
1711  tdd_ring_sizes.val.arr_val.emplace_back();
1712  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1713  }
1714  }
1715  tdd_ring_sizes.is_null = is_null_geo;
1716  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1717  }
1718 
1719  if (col_type == kMULTIPOLYGON) {
1720  // Create poly_rings array value and add it to the physical column
1721  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1722  TDatum tdd_poly_rings;
1723  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1724  if (!is_null_geo) {
1725  for (auto num_rings : poly_rings) {
1726  tdd_poly_rings.val.arr_val.emplace_back();
1727  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1728  }
1729  }
1730  tdd_poly_rings.is_null = is_null_geo;
1731  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1732  }
1733 
1734  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1735  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1736  TDatum tdd_bounds;
1737  tdd_bounds.val.arr_val.reserve(bounds.size());
1738  if (!is_null_geo) {
1739  for (auto b : bounds) {
1740  tdd_bounds.val.arr_val.emplace_back();
1741  tdd_bounds.val.arr_val.back().val.real_val = b;
1742  }
1743  }
1744  tdd_bounds.is_null = is_null_geo;
1745  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1746  }
1747 
1748  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1749  // Create render_group value and add it to the physical column
1750  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1751  TDatum td_render_group;
1752  td_render_group.val.int_val = render_group;
1753  td_render_group.is_null = is_null_geo;
1754  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1755  }
1756 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
std::vector< int > &  render_groups_column 
)
static

Definition at line 1758 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1767  {
1768  const auto col_ti = cd->columnType;
1769  const auto col_type = col_ti.get_type();
1770  auto columnId = cd->columnId;
1771 
1772  auto coords_row_count = coords_column.size();
1773  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1774  for (auto& coords : coords_column) {
1775  bool is_null_geo = false;
1776  bool is_null_point = false;
1777  if (!col_ti.get_notnull()) {
1778  // Check for NULL geo
1779  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1780  is_null_point = true;
1781  coords.clear();
1782  }
1783  is_null_geo = coords.empty();
1784  if (is_null_point) {
1785  coords.push_back(NULL_ARRAY_DOUBLE);
1786  coords.push_back(NULL_DOUBLE);
1787  // Treating POINT coords as notnull, need to store actual encoding
1788  // [un]compressed+[not]null
1789  is_null_geo = false;
1790  }
1791  }
1792  std::vector<TDatum> td_coords_data;
1793  if (!is_null_geo) {
1794  std::vector<uint8_t> compressed_coords =
1795  Geospatial::compress_coords(coords, col_ti);
1796  for (auto const& cc : compressed_coords) {
1797  TDatum td_byte;
1798  td_byte.val.int_val = cc;
1799  td_coords_data.push_back(td_byte);
1800  }
1801  }
1802  TDatum tdd_coords;
1803  tdd_coords.val.arr_val = td_coords_data;
1804  tdd_coords.is_null = is_null_geo;
1805  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1806  }
1807  col_idx++;
1808 
1809  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1810  if (ring_sizes_column.size() != coords_row_count) {
1811  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1812  }
1813  // Create ring_sizes array value and add it to the physical column
1814  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1815  for (auto const& ring_sizes : ring_sizes_column) {
1816  bool is_null_geo = false;
1817  if (!col_ti.get_notnull()) {
1818  // Check for NULL geo
1819  is_null_geo = ring_sizes.empty();
1820  }
1821  std::vector<TDatum> td_ring_sizes;
1822  for (auto const& ring_size : ring_sizes) {
1823  TDatum td_ring_size;
1824  td_ring_size.val.int_val = ring_size;
1825  td_ring_sizes.push_back(td_ring_size);
1826  }
1827  TDatum tdd_ring_sizes;
1828  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1829  tdd_ring_sizes.is_null = is_null_geo;
1830  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1831  }
1832  col_idx++;
1833  }
1834 
1835  if (col_type == kMULTIPOLYGON) {
1836  if (poly_rings_column.size() != coords_row_count) {
1837  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1838  }
1839  // Create poly_rings array value and add it to the physical column
1840  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1841  for (auto const& poly_rings : poly_rings_column) {
1842  bool is_null_geo = false;
1843  if (!col_ti.get_notnull()) {
1844  // Check for NULL geo
1845  is_null_geo = poly_rings.empty();
1846  }
1847  std::vector<TDatum> td_poly_rings;
1848  for (auto const& num_rings : poly_rings) {
1849  TDatum td_num_rings;
1850  td_num_rings.val.int_val = num_rings;
1851  td_poly_rings.push_back(td_num_rings);
1852  }
1853  TDatum tdd_poly_rings;
1854  tdd_poly_rings.val.arr_val = td_poly_rings;
1855  tdd_poly_rings.is_null = is_null_geo;
1856  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1857  }
1858  col_idx++;
1859  }
1860 
1861  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1862  if (bounds_column.size() != coords_row_count) {
1863  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1864  }
1865  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1866  for (auto const& bounds : bounds_column) {
1867  bool is_null_geo = false;
1868  if (!col_ti.get_notnull()) {
1869  // Check for NULL geo
1870  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1871  }
1872  std::vector<TDatum> td_bounds_data;
1873  for (auto const& b : bounds) {
1874  TDatum td_double;
1875  td_double.val.real_val = b;
1876  td_bounds_data.push_back(td_double);
1877  }
1878  TDatum tdd_bounds;
1879  tdd_bounds.val.arr_val = td_bounds_data;
1880  tdd_bounds.is_null = is_null_geo;
1881  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1882  }
1883  col_idx++;
1884  }
1885 
1886  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1887  // Create render_group value and add it to the physical column
1888  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1889  for (auto const& render_group : render_groups_column) {
1890  TDatum td_render_group;
1891  td_render_group.val.int_val = render_group;
1892  td_render_group.is_null = false;
1893  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1894  }
1895  col_idx++;
1896  }
1897 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:222
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 241 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data(), importDelimited(), importGDALGeo(), importGDALRaster(), and import_export::ForeignDataImporter::importGeneralS3().

241  {
243  is.end = std::chrono::steady_clock::now();
244  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
246 }
std::lock_guard< T > lock_guard
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:175
heavyai::unique_lock< heavyai::shared_mutex > write_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:174
std::string import_id
Definition: Importer.h:881

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 884 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 882 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 888 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 887 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 883 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDALGeo(), and importGDALRaster().


The documentation for this class was generated from the following files: