OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
 

Private Member Functions

ImportStatus importGDALGeo (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGDALRaster (const Catalog_Namespace::SessionInfo *session_info)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static
Geospatial::GDAL::DataSourceUqPtr 
openGDALDataSource (const std::string &fileName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsGeo (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsRaster (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 778 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 831 of file Importer.h.

831 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 171 of file Importer.cpp.

175  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:171
constexpr double f
Definition: Utm.h:31
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 177 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
constexpr double f
Definition: Utm.h:31
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:893
std::string import_id
Definition: Importer.h:887
std::unique_ptr< Loader > loader
Definition: Importer.h:892
const std::string file_path
Definition: Importer.h:712
import_export::Importer::~Importer ( )
override

Definition at line 218 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

218  {
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3602 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), importGDALGeo(), and importGDALRaster().

3603  {
3604  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3607  // rollback to starting epoch - undo all the added records
3608  loader->setTableEpochs(table_epochs);
3609  } else {
3610  loader->checkpoint();
3611  }
3612  }
3613 
3614  if (loader->getTableDesc()->persistenceLevel ==
3615  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3616  // tables
3617  auto ms = measure<>::execution([&]() {
3619  if (!import_status_.load_failed) {
3620  for (auto& p : import_buffers_vec[0]) {
3621  if (!p->stringDictCheckpoint()) {
3622  LOG(ERROR) << "Checkpointing Dictionary for Column "
3623  << p->getColumnDesc()->columnName << " failed.";
3624  import_status_.load_failed = true;
3625  import_status_.load_msg = "Dictionary checkpoint failed";
3626  break;
3627  }
3628  }
3629  }
3630  });
3631  if (DEBUG_TIMING) {
3632  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3633  << std::endl;
3634  }
3635  }
3636 }
std::lock_guard< T > lock_guard
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:283
heavyai::unique_lock< heavyai::shared_mutex > write_lock
#define DEBUG_TIMING
Definition: Importer.cpp:157
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
static constexpr char const * FOREIGN_TABLE
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5153 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5153  {
5154  return gdalStatInternal(path, copy_params, false);
5155 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5118

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5158 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5159  {
5160  return gdalStatInternal(path, copy_params, true);
5161 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5118

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5230 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5232  {
5233  // lazy init GDAL
5240 
5241  // prepare to gather files
5242  std::vector<std::string> files;
5243 
5244  // gather the files recursively
5245  gdalGatherFilesInArchiveRecursive(archive_path, files);
5246 
5247  // convert to relative paths inside archive
5248  for (auto& file : files) {
5249  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5250  }
5251 
5252  // done
5253  return files;
5254 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:5163
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5257 of file Importer.cpp.

References CHECK, EMPTY, GEO, Geospatial::GDAL::init(), NON_GEO, openGDALDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5259  {
5260  // lazy init GDAL
5267 
5268  // prepare to gather layer info
5269  std::vector<GeoFileLayerInfo> layer_info;
5270 
5271  // open the data set
5273  if (poDS == nullptr) {
5274  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5275  file_name);
5276  }
5277 
5278  // enumerate the layers
5279  for (auto&& poLayer : poDS->GetLayers()) {
5281  // prepare to read this layer
5282  poLayer->ResetReading();
5283  // skip layer if empty
5284  if (poLayer->GetFeatureCount() > 0) {
5285  // first read layer geo type
5286  auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5287  if (ogr_type == wkbUnknown) {
5288  // layer geo type unknown, so try reading from the first feature
5289  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5290  CHECK(first_feature);
5291  auto const* ogr_geometry = first_feature->GetGeometryRef();
5292  if (ogr_geometry) {
5293  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5294  } else {
5295  ogr_type = wkbNone;
5296  }
5297  }
5298  switch (ogr_type) {
5299  case wkbNone:
5300  // no geo
5301  contents = GeoFileLayerContents::NON_GEO;
5302  break;
5303  case wkbPoint:
5304  case wkbMultiPoint:
5305  case wkbLineString:
5306  case wkbMultiLineString:
5307  case wkbPolygon:
5308  case wkbMultiPolygon:
5309  // layer has supported geo
5310  contents = GeoFileLayerContents::GEO;
5311  break;
5312  default:
5313  // layer has unsupported geometry
5315  break;
5316  }
5317  }
5318  // store info for this layer
5319  layer_info.emplace_back(poLayer->GetName(), contents);
5320  }
5321 
5322  // done
5323  return layer_info;
5324 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4685
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:289
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 5118 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

5120  {
5121  // lazy init GDAL
5128 
5129 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5130  // clear GDAL stat cache
5131  // without this, file existence will be cached, even if authentication changes
5132  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5133  VSICurlClearCache();
5134 #endif
5135 
5136  // stat path
5137  VSIStatBufL sb;
5138  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5139  if (result < 0) {
5140  return false;
5141  }
5142 
5143  // exists?
5144  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5145  return true;
5146  } else if (VSI_ISREG(sb.st_mode)) {
5147  return true;
5148  }
5149  return false;
5150 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const bool  is_raster,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4922 of file Importer.cpp.

References gdalToColumnDescriptorsGeo(), and gdalToColumnDescriptorsRaster().

Referenced by DBHandler::detect_column_types().

4926  {
4927  if (is_raster) {
4928  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4929  }
4930  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4931 }
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:5006
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4934

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsGeo ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 5006 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, import_export::anonymous_namespace{Importer.cpp}::ogr_to_type(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

5009  {
5010  std::list<ColumnDescriptor> cds;
5011 
5013  if (poDS == nullptr) {
5014  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5015  file_name);
5016  }
5017  if (poDS->GetLayerCount() == 0) {
5018  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5019  " has no layers");
5020  }
5021 
5022  OGRLayer& layer =
5024 
5025  layer.ResetReading();
5026  // TODO(andrewseidl): support multiple features
5027  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5028  if (poFeature == nullptr) {
5029  throw std::runtime_error("No features found in " + file_name);
5030  }
5031  // get fields as regular columns
5032  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5033  CHECK(poFDefn);
5034  int iField;
5035  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5036  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5037  auto typePair = ogr_to_type(poFieldDefn->GetType());
5038  ColumnDescriptor cd;
5039  cd.columnName = poFieldDefn->GetNameRef();
5040  cd.sourceName = poFieldDefn->GetNameRef();
5041  SQLTypeInfo ti;
5042  if (typePair.second) {
5043  ti.set_type(kARRAY);
5044  ti.set_subtype(typePair.first);
5045  } else {
5046  ti.set_type(typePair.first);
5047  }
5048  if (typePair.first == kTEXT) {
5050  ti.set_comp_param(32);
5051  }
5052  ti.set_fixed_size();
5053  cd.columnType = ti;
5054  cds.push_back(cd);
5055  }
5056  // try getting the geo column type from the layer
5057  auto ogr_type = wkbFlatten(layer.GetGeomType());
5058  if (ogr_type == wkbUnknown) {
5059  // layer geo type unknown, so try reading from the first feature
5060  Geospatial::GDAL::FeatureUqPtr first_feature(layer.GetNextFeature());
5061  CHECK(first_feature);
5062  auto const* ogr_geometry = first_feature->GetGeometryRef();
5063  if (ogr_geometry) {
5064  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5065  } else {
5066  ogr_type = wkbNone;
5067  }
5068  }
5069  // do we have a geo column?
5070  if (ogr_type != wkbNone) {
5071  ColumnDescriptor cd;
5072  cd.columnName = geo_column_name;
5073  cd.sourceName = geo_column_name;
5074 
5075  // if exploding, override any collection type to child type
5077  if (ogr_type == wkbMultiPolygon) {
5078  ogr_type = wkbPolygon;
5079  } else if (ogr_type == wkbMultiLineString) {
5080  ogr_type = wkbLineString;
5081  } else if (ogr_type == wkbMultiPoint) {
5082  ogr_type = wkbPoint;
5083  }
5084  }
5085 
5086  // convert to internal type
5087  // this will throw if the type is unsupported
5088  SQLTypes geoType = ogr_to_type(ogr_type);
5089 
5090  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5092  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5093  }
5094 
5095  // build full internal type
5096  SQLTypeInfo ti;
5097  ti.set_type(geoType);
5103  cd.columnType = ti;
5104 
5105  cds.push_back(cd);
5106  }
5107 
5108  // metadata columns?
5109  auto metadata_column_infos =
5111  for (auto& mci : metadata_column_infos) {
5112  cds.push_back(std::move(mci.column_descriptor));
5113  }
5114 
5115  return cds;
5116 }
void set_compression(EncodingType c)
Definition: sqltypes.h:501
SQLTypes
Definition: sqltypes.h:53
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:491
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4704
std::string sourceName
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:494
void set_fixed_size()
Definition: sqltypes.h:499
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4823
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:163
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:496
void set_comp_param(int p)
Definition: sqltypes.h:502
std::string geo_layer_name
Definition: CopyParams.h:81
Definition: sqltypes.h:67
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4685
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:289
SQLTypeInfo columnType
std::string columnName
EncodingType geo_coords_encoding
Definition: CopyParams.h:76
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:490

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsRaster ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4934 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getPointNamesAndSQLTypes(), Geospatial::GDAL::init(), kENCODING_GEOINT, kGEOMETRY, kPOINT, import_export::parse_add_metadata_columns(), import_export::CopyParams::raster_import_bands, import_export::CopyParams::raster_import_dimensions, import_export::CopyParams::raster_point_compute_angle, import_export::CopyParams::raster_point_transform, import_export::CopyParams::raster_point_type, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Geospatial::GDAL::setAuthorizationTokens(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4937  {
4938  // lazy init GDAL
4945 
4946  // prepare for metadata column
4947  auto metadata_column_infos =
4949 
4950  // create a raster importer and do the detect
4951  RasterImporter raster_importer;
4952  raster_importer.detect(
4953  file_name,
4959  false,
4960  metadata_column_infos);
4961 
4962  // prepare to capture column descriptors
4963  std::list<ColumnDescriptor> cds;
4964 
4965  // get the point column info
4966  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4967 
4968  // create the columns for the point in the specified type
4969  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4970  ColumnDescriptor cd;
4971  cd.columnName = cd.sourceName = col_name;
4972  cd.columnType.set_type(sql_type);
4973  // hardwire other POINT attributes for now
4974  if (sql_type == kPOINT) {
4976  cd.columnType.set_input_srid(4326);
4977  cd.columnType.set_output_srid(4326);
4979  cd.columnType.set_comp_param(32);
4980  }
4981  cds.push_back(cd);
4982  }
4983 
4984  // get the names and types for the band column(s)
4985  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4986 
4987  // add column descriptors for each band
4988  for (auto const& [band_name, sql_type] : band_names_and_types) {
4989  ColumnDescriptor cd;
4990  cd.columnName = cd.sourceName = band_name;
4991  cd.columnType.set_type(sql_type);
4993  cds.push_back(cd);
4994  }
4995 
4996  // metadata columns?
4997  for (auto& mci : metadata_column_infos) {
4998  cds.push_back(std::move(mci.column_descriptor));
4999  }
5000 
5001  // return the results
5002  return cds;
5003 }
void set_compression(EncodingType c)
Definition: sqltypes.h:501
std::string s3_secret_key
Definition: CopyParams.h:62
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4881
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:491
static void init()
Definition: GDAL.cpp:67
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string sourceName
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:494
RasterPointType raster_point_type
Definition: CopyParams.h:88
void set_fixed_size()
Definition: sqltypes.h:499
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:496
void set_comp_param(int p)
Definition: sqltypes.h:502
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4903
std::string s3_session_token
Definition: CopyParams.h:63
std::string raster_import_bands
Definition: CopyParams.h:89
SQLTypeInfo columnType
std::string s3_access_key
Definition: CopyParams.h:61
std::string columnName
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:490

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 795 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

795  {
796  return loader->get_column_descs();
797  }
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 794 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

794 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 804 of file Importer.h.

References import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

804  {
805  return import_buffers_vec[i];
806  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 801 of file Importer.h.

References import_buffers_vec.

801  {
802  return import_buffers_vec;
803  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 230 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

230  {
232  auto it = import_status_map.find(import_id);
233  if (it == import_status_map.end()) {
234  throw std::runtime_error("Import status not found for id: " + import_id);
235  }
236  return it->second;
237 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:166
std::shared_lock< T > shared_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:165
std::string import_id
Definition: Importer.h:887

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 807 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

807 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:893

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 841 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

841 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 864 of file Importer.h.

References loader.

864 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:892
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4439 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4439  {
4440  return DataStreamSink::archivePlumber(session_info);
4441 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3638

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4443 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), heavyai::fopen(), g_enable_assign_render_groups, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4446  {
4448  auto query_session = session_info ? session_info->get_session_id() : "";
4449 
4450  if (!p_file) {
4451  p_file = fopen(file_path.c_str(), "rb");
4452  }
4453  if (!p_file) {
4454  throw std::runtime_error("failed to open file '" + file_path +
4455  "': " + strerror(errno));
4456  }
4457 
4458  if (!decompressed) {
4459  (void)fseek(p_file, 0, SEEK_END);
4460  file_size = ftell(p_file);
4461  }
4462 
4464  VLOG(1) << "Delimited import # threads: " << max_threads;
4465 
4466  // deal with small files
4467  size_t alloc_size = copy_params.buffer_size;
4468  if (!decompressed && file_size < alloc_size) {
4469  alloc_size = file_size;
4470  }
4471 
4472  for (size_t i = 0; i < max_threads; i++) {
4473  import_buffers_vec.emplace_back();
4474  for (const auto cd : loader->get_column_descs()) {
4475  import_buffers_vec[i].emplace_back(
4476  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4477  }
4478  }
4479 
4480  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4481  size_t current_pos = 0;
4482  size_t end_pos;
4483  size_t begin_pos = 0;
4484 
4485  (void)fseek(p_file, current_pos, SEEK_SET);
4486  size_t size =
4487  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4488 
4489  // make render group analyzers for each poly column
4490  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4493  auto& cat = loader->getCatalog();
4494  auto* td = loader->getTableDesc();
4495  CHECK(td);
4496  auto column_descriptors =
4497  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4498  for (auto const& cd : column_descriptors) {
4499  if (IS_GEO_POLY(cd->columnType.get_type())) {
4500  auto rga = std::make_shared<RenderGroupAnalyzer>();
4501  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4502  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4503  }
4504  }
4505  } else {
4506  fclose(p_file);
4507  throw std::runtime_error(
4508  "Render Group Assignment requested in CopyParams but disabled in Server "
4509  "Config. Set enable_assign_render_groups=true in Server Config to override.");
4510  }
4511  }
4512 
4513  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4514  loader->getTableDesc()->tableId};
4515  auto table_epochs = loader->getTableEpochs();
4517  {
4518  std::list<std::future<ImportStatus>> threads;
4519 
4520  // use a stack to track thread_ids which must not overlap among threads
4521  // because thread_id is used to index import_buffers_vec[]
4522  std::stack<size_t> stack_thread_ids;
4523  for (size_t i = 0; i < max_threads; i++) {
4524  stack_thread_ids.push(i);
4525  }
4526  // added for true row index on error
4527  size_t first_row_index_this_buffer = 0;
4528 
4529  while (size > 0) {
4530  unsigned int num_rows_this_buffer = 0;
4531  CHECK(scratch_buffer);
4532  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4533  scratch_buffer,
4534  size,
4535  copy_params,
4536  first_row_index_this_buffer,
4537  num_rows_this_buffer,
4538  p_file);
4539 
4540  // unput residual
4541  int nresidual = size - end_pos;
4542  std::unique_ptr<char[]> unbuf;
4543  if (nresidual > 0) {
4544  unbuf = std::make_unique<char[]>(nresidual);
4545  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4546  }
4547 
4548  // get a thread_id not in use
4549  auto thread_id = stack_thread_ids.top();
4550  stack_thread_ids.pop();
4551  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4552 
4553  threads.push_back(std::async(std::launch::async,
4555  thread_id,
4556  this,
4557  std::move(scratch_buffer),
4558  begin_pos,
4559  end_pos,
4560  end_pos,
4561  columnIdToRenderGroupAnalyzerMap,
4562  first_row_index_this_buffer,
4563  session_info,
4564  executor));
4565 
4566  first_row_index_this_buffer += num_rows_this_buffer;
4567 
4568  current_pos += end_pos;
4569  scratch_buffer = std::make_unique<char[]>(alloc_size);
4570  CHECK(scratch_buffer);
4571  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4572  size = nresidual +
4573  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4574 
4575  begin_pos = 0;
4576  while (threads.size() > 0) {
4577  int nready = 0;
4578  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4579  it != threads.end();) {
4580  auto& p = *it;
4581  std::chrono::milliseconds span(0);
4582  if (p.wait_for(span) == std::future_status::ready) {
4583  auto ret_import_status = p.get();
4584  {
4586  import_status_ += ret_import_status;
4587  if (ret_import_status.load_failed) {
4589  }
4590  }
4591  // sum up current total file offsets
4592  size_t total_file_offset{0};
4593  if (decompressed) {
4594  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4595  for (const auto file_offset : file_offsets) {
4596  total_file_offset += file_offset;
4597  }
4598  }
4599  // estimate number of rows per current total file offset
4600  if (decompressed ? total_file_offset : current_pos) {
4602  (decompressed ? (float)total_file_size / total_file_offset
4603  : (float)file_size / current_pos) *
4604  import_status_.rows_completed;
4605  }
4606  VLOG(3) << "rows_completed " << import_status_.rows_completed
4607  << ", rows_estimated " << import_status_.rows_estimated
4608  << ", total_file_size " << total_file_size << ", total_file_offset "
4609  << total_file_offset;
4611  // recall thread_id for reuse
4612  stack_thread_ids.push(ret_import_status.thread_id);
4613  threads.erase(it++);
4614  ++nready;
4615  } else {
4616  ++it;
4617  }
4618  }
4619 
4620  if (nready == 0) {
4621  std::this_thread::yield();
4622  }
4623 
4624  // on eof, wait all threads to finish
4625  if (0 == size) {
4626  continue;
4627  }
4628 
4629  // keep reading if any free thread slot
4630  // this is one of the major difference from old threading model !!
4631  if (threads.size() < max_threads) {
4632  break;
4633  }
4636  break;
4637  }
4638  }
4641  import_status_.load_failed = true;
4642  // todo use better message
4643  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4644  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4645  break;
4646  }
4648  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4649  break;
4650  }
4651  }
4652 
4653  // join dangling threads in case of LOG(ERROR) above
4654  for (auto& p : threads) {
4655  p.wait();
4656  }
4657  }
4658 
4659  checkpoint(table_epochs);
4660 
4661  fclose(p_file);
4662  p_file = nullptr;
4663  return import_status_;
4664 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::string cat(Ts &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:2000
heavyai::shared_lock< heavyai::shared_mutex > read_lock
#define LOG(tag)
Definition: Logger.h:283
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3602
ThreadId thread_id()
Definition: Logger.cpp:870
#define CHECK(condition)
Definition: Logger.h:289
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:154
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
std::vector< size_t > file_offsets
Definition: Importer.h:717
#define VLOG(n)
Definition: Logger.h:383
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712
#define IS_GEO_POLY(T)
Definition: sqltypes.h:303

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info,
const bool  is_raster 
)

Definition at line 5326 of file Importer.cpp.

References importGDALGeo(), and importGDALRaster().

Referenced by QueryRunner::ImportDriver::importGeoTable().

5329  {
5330  if (is_raster) {
5331  return importGDALRaster(session_info);
5332  }
5333  return importGDALGeo(columnNameToSourceNameMap, session_info);
5334 }
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5336
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5649

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALGeo ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 5336 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, g_enable_assign_render_groups, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by importGDAL().

5338  {
5339  // initial status
5342  if (poDS == nullptr) {
5343  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5344  file_path);
5345  }
5346 
5347  OGRLayer& layer =
5349 
5350  // get the number of features in this layer
5351  size_t numFeatures = layer.GetFeatureCount();
5352 
5353  // build map of metadata field (additional columns) name to index
5354  // use shared_ptr since we need to pass it to the worker
5355  FieldNameToIndexMapType fieldNameToIndexMap;
5356  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5357  CHECK(poFDefn);
5358  size_t numFields = poFDefn->GetFieldCount();
5359  for (size_t iField = 0; iField < numFields; iField++) {
5360  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5361  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5362  }
5363 
5364  // the geographic spatial reference we want to put everything in
5365  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5366  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5367 
5368 #if GDAL_VERSION_MAJOR >= 3
5369  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5370  // this results in X and Y being transposed for angle-based
5371  // coordinate systems. This restores the previous behavior.
5372  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5373 #endif
5374 
5375 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5376  // just one "thread"
5377  max_threads = 1;
5378 #else
5379  // how many threads to use
5381 #endif
5382  VLOG(1) << "GDAL import # threads: " << max_threads;
5383 
5384  // metadata columns?
5385  auto const metadata_column_infos =
5387 
5388  // import geo table is specifically handled in both DBHandler and QueryRunner
5389  // that is separate path against a normal SQL execution
5390  // so we here explicitly enroll the import session to allow interruption
5391  // while importing geo table
5392  auto query_session = session_info ? session_info->get_session_id() : "";
5393  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5395  auto is_session_already_registered = false;
5396  {
5398  executor->getSessionLock());
5399  is_session_already_registered =
5400  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5401  }
5402  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5403  !is_session_already_registered) {
5404  executor->enrollQuerySession(query_session,
5405  "IMPORT_GEO_TABLE",
5406  query_submitted_time,
5408  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5409  }
5410  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5411  // reset the runtime query interrupt status
5412  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5413  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5414  }
5415  };
5416 
5417  // make an import buffer for each thread
5418  CHECK_EQ(import_buffers_vec.size(), 0u);
5420  for (size_t i = 0; i < max_threads; i++) {
5421  for (const auto cd : loader->get_column_descs()) {
5422  import_buffers_vec[i].emplace_back(
5423  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5424  }
5425  }
5426 
5427  // make render group analyzers for each poly column
5428  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5431  auto& cat = loader->getCatalog();
5432  auto* td = loader->getTableDesc();
5433  CHECK(td);
5434  auto column_descriptors =
5435  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5436  for (auto const& cd : column_descriptors) {
5437  if (IS_GEO_POLY(cd->columnType.get_type())) {
5438  auto rga = std::make_shared<RenderGroupAnalyzer>();
5439  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5440  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5441  }
5442  }
5443  } else {
5444  throw std::runtime_error(
5445  "Render Group Assignment requested in CopyParams but disabled in Server "
5446  "Config. Set enable_assign_render_groups=true in Server Config to override.");
5447  }
5448  }
5449 
5450 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5451  // threads
5452  std::list<std::future<ImportStatus>> threads;
5453 
5454  // use a stack to track thread_ids which must not overlap among threads
5455  // because thread_id is used to index import_buffers_vec[]
5456  std::stack<size_t> stack_thread_ids;
5457  for (size_t i = 0; i < max_threads; i++) {
5458  stack_thread_ids.push(i);
5459  }
5460 #endif
5461 
5462  // checkpoint the table
5463  auto table_epochs = loader->getTableEpochs();
5464 
5465  // reset the layer
5466  layer.ResetReading();
5467 
5468  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5469 
5470  // make a features buffer for each thread
5471  std::vector<FeaturePtrVector> features(max_threads);
5472 
5473  // make one of these for each thread, based on the first feature's SR
5474  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5475  max_threads);
5476 
5477  // for each feature...
5478  size_t firstFeatureThisChunk = 0;
5479  while (firstFeatureThisChunk < numFeatures) {
5480  // how many features this chunk
5481  size_t numFeaturesThisChunk =
5482  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5483 
5484 // get a thread_id not in use
5485 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5486  size_t thread_id = 0;
5487 #else
5488  auto thread_id = stack_thread_ids.top();
5489  stack_thread_ids.pop();
5490  CHECK(thread_id < max_threads);
5491 #endif
5492 
5493  // fill features buffer for new thread
5494  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5495  features[thread_id].emplace_back(layer.GetNextFeature());
5496  }
5497 
5498  // construct a coordinate transformation for each thread, if needed
5499  // some features may not have geometry, so look for the first one that does
5500  if (coordinate_transformations[thread_id] == nullptr) {
5501  for (auto const& feature : features[thread_id]) {
5502  auto const* geometry = feature->GetGeometryRef();
5503  if (geometry) {
5504  auto const* geometry_sr = geometry->getSpatialReference();
5505  // if the SR is non-null and non-empty and different from what we want
5506  // we need to make a reusable CoordinateTransformation
5507  if (geometry_sr &&
5508 #if GDAL_VERSION_MAJOR >= 3
5509  !geometry_sr->IsEmpty() &&
5510 #endif
5511  !geometry_sr->IsSame(poGeographicSR.get())) {
5512  // validate the SR before trying to use it
5513  if (geometry_sr->Validate() != OGRERR_NONE) {
5514  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5515  }
5516  // create the OGRCoordinateTransformation that will be used for
5517  // all the features in this chunk
5518  coordinate_transformations[thread_id].reset(
5519  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5520  if (coordinate_transformations[thread_id] == nullptr) {
5521  throw std::runtime_error(
5522  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5523  }
5524  }
5525  // once we find at least one geometry with an SR, we're done
5526  break;
5527  }
5528  }
5529  }
5530 
5531 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5532  // call worker function directly
5533  auto ret_import_status =
5535  this,
5536  coordinate_transformations[thread_id].get(),
5537  std::move(features[thread_id]),
5538  firstFeatureThisChunk,
5539  numFeaturesThisChunk,
5540  fieldNameToIndexMap,
5541  columnNameToSourceNameMap,
5542  columnIdToRenderGroupAnalyzerMap,
5543  session_info,
5544  executor.get(),
5545  metadata_column_infos);
5546  import_status += ret_import_status;
5547  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5548  import_status.rows_completed;
5549  set_import_status(import_id, import_status);
5550 #else
5551  // fire up that thread to import this geometry
5552  threads.push_back(std::async(std::launch::async,
5554  thread_id,
5555  this,
5556  coordinate_transformations[thread_id].get(),
5557  std::move(features[thread_id]),
5558  firstFeatureThisChunk,
5559  numFeaturesThisChunk,
5560  fieldNameToIndexMap,
5561  columnNameToSourceNameMap,
5562  columnIdToRenderGroupAnalyzerMap,
5563  session_info,
5564  executor.get(),
5565  metadata_column_infos));
5566 
5567  // let the threads run
5568  while (threads.size() > 0) {
5569  int nready = 0;
5570  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5571  it != threads.end();) {
5572  auto& p = *it;
5573  std::chrono::milliseconds span(
5574  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5575  if (p.wait_for(span) == std::future_status::ready) {
5576  auto ret_import_status = p.get();
5577  {
5579  import_status_ += ret_import_status;
5581  ((float)firstFeatureThisChunk / (float)numFeatures) *
5585  break;
5586  }
5587  }
5588  // recall thread_id for reuse
5589  stack_thread_ids.push(ret_import_status.thread_id);
5590 
5591  threads.erase(it++);
5592  ++nready;
5593  } else {
5594  ++it;
5595  }
5596  }
5597 
5598  if (nready == 0) {
5599  std::this_thread::yield();
5600  }
5601 
5602  // keep reading if any free thread slot
5603  // this is one of the major difference from old threading model !!
5604  if (threads.size() < max_threads) {
5605  break;
5606  }
5607  }
5608 #endif
5609 
5610  // out of rows?
5611 
5614  import_status_.load_failed = true;
5615  // todo use better message
5616  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5617  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5618  break;
5619  }
5621  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5622  "more details";
5623  break;
5624  }
5625 
5626  firstFeatureThisChunk += numFeaturesThisChunk;
5627  }
5628 
5629 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5630  // wait for any remaining threads
5631  if (threads.size()) {
5632  for (auto& p : threads) {
5633  // wait for the thread
5634  p.wait();
5635  // get the result and update the final import status
5636  auto ret_import_status = p.get();
5637  import_status_ += ret_import_status;
5640  }
5641  }
5642 #endif
5643 
5644  checkpoint(table_epochs);
5645 
5646  return import_status_;
5647 }
std::lock_guard< T > lock_guard
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:151
#define CHECK_EQ(x, y)
Definition: Logger.h:297
std::string cat(Ts &&...args)
#define LOG(tag)
Definition: Logger.h:283
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4704
heavyai::unique_lock< heavyai::shared_mutex > write_lock
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
std::string add_metadata_columns
Definition: CopyParams.h:94
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
std::unique_lock< T > unique_lock
std::string toString(const ExecutorDeviceType &device_type)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
Definition: Importer.cpp:2378
std::string get_session_id() const
Definition: SessionInfo.h:93
std::string geo_layer_name
Definition: CopyParams.h:81
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4685
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
Definition: GDAL.h:59
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3602
ThreadId thread_id()
Definition: Logger.cpp:870
#define CHECK(condition)
Definition: Logger.h:289
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:154
#define VLOG(n)
Definition: Logger.h:383
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712
#define IS_GEO_POLY(T)
Definition: sqltypes.h:303

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALRaster ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 5649 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, anonymous_namespace{Importer.cpp}::check_session_interrupted(), checkpoint(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Geospatial::compress_coords(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), Executor::ERR_INTERRUPTED, anonymous_namespace{Utm.h}::f, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_output_srid(), Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getBandNullValue(), import_export::RasterImporter::getBandsHeight(), import_export::RasterImporter::getBandsWidth(), Executor::getExecutor(), import_export::RasterImporter::getNumBands(), import_export::RasterImporter::getPointNamesAndSQLTypes(), import_export::RasterImporter::getProjectedPixelCoords(), import_export::RasterImporter::getRawPixels(), import_export::RasterImporter::import(), import_buffers_vec, import_id, import_export::DataStreamSink::import_status_, logger::INFO, ColumnDescriptor::isGeoPhyCol, kARRAY, kBIGINT, kDOUBLE, kFLOAT, kINT, kMaxRasterScanlinesPerThread, kNULLT, kPOINT, kSMALLINT, kTINYINT, load(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, NULL_DOUBLE, NULL_FLOAT, NULL_INT, NULL_SMALLINT, import_export::num_import_threads(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, set_import_status(), logger::thread_id(), timer_start(), TIMER_STOP, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, and VLOG.

Referenced by importGDAL().

5650  {
5651  // initial status
5653 
5654  // metadata columns?
5655  auto const metadata_column_infos =
5657 
5658  // create a raster importer and do the detect
5659  RasterImporter raster_importer;
5660  raster_importer.detect(
5661  file_path,
5667  true,
5668  metadata_column_infos);
5669 
5670  // get the table columns and count actual columns
5671  auto const& column_descs = loader->get_column_descs();
5672  uint32_t num_table_cols{0u};
5673  for (auto const* cd : column_descs) {
5674  if (!cd->isGeoPhyCol) {
5675  num_table_cols++;
5676  }
5677  }
5678 
5679  // how many bands do we have?
5680  auto num_bands = raster_importer.getNumBands();
5681 
5682  // get point columns info
5683  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5684 
5685  // validate that the table column count matches
5686  auto num_expected_cols = num_bands;
5687  num_expected_cols += point_names_and_sql_types.size();
5688  num_expected_cols += metadata_column_infos.size();
5689  if (num_expected_cols != num_table_cols) {
5690  throw std::runtime_error(
5691  "Raster Import aborted. Band/Column count mismatch (file requires " +
5692  std::to_string(num_expected_cols) + ", table has " +
5693  std::to_string(num_table_cols) + ")");
5694  }
5695 
5696  // validate the point column names and types
5697  // if we're importing the coords as a POINT, then the first column
5698  // must be a POINT (two physical columns, POINT and TINYINT[])
5699  // if we're not, the first two columns must be the matching type
5700  // optionally followed by an angle column
5701  auto cd_itr = column_descs.begin();
5702  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5703  if (sql_type == kPOINT) {
5704  // POINT column
5705  {
5706  auto const* cd = *cd_itr++;
5707  if (cd->columnName != col_name) {
5708  throw std::runtime_error("Column '" + cd->columnName +
5709  "' overridden name invalid (must be '" + col_name +
5710  "')");
5711  }
5712  auto const cd_type = cd->columnType.get_type();
5713  if (cd_type != kPOINT) {
5714  throw std::runtime_error("Column '" + cd->columnName +
5715  "' overridden type invalid (must be POINT)");
5716  }
5717  if (cd->columnType.get_output_srid() != 4326) {
5718  throw std::runtime_error("Column '" + cd->columnName +
5719  "' overridden SRID invalid (must be 4326)");
5720  }
5721  }
5722  // TINYINT[] coords sub-column
5723  {
5724  // if the above is true, this must be true
5725  auto const* cd = *cd_itr++;
5726  CHECK(cd->columnType.get_type() == kARRAY);
5727  CHECK(cd->columnType.get_subtype() == kTINYINT);
5728  }
5729  } else {
5730  // column of the matching name and type
5731  auto const* cd = *cd_itr++;
5732  if (cd->columnName != col_name) {
5733  throw std::runtime_error("Column '" + cd->columnName +
5734  "' overridden name invalid (must be '" + col_name +
5735  "')");
5736  }
5737  auto const cd_type = cd->columnType.get_type();
5738  if (cd_type != sql_type) {
5739  throw std::runtime_error("Column '" + cd->columnName +
5740  "' overridden type invalid (must be " +
5741  to_string(sql_type) + ")");
5742  }
5743  }
5744  }
5745 
5746  // validate the band column types
5747  // any Immerse overriding to other types will currently be rejected
5748  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5749  if (band_names_and_types.size() != num_bands) {
5750  throw std::runtime_error("Column/Band count mismatch when validating types");
5751  }
5752  for (uint32_t i = 0; i < num_bands; i++) {
5753  auto const* cd = *cd_itr++;
5754  auto const cd_type = cd->columnType.get_type();
5755  auto const sql_type = band_names_and_types[i].second;
5756  if (cd_type != sql_type) {
5757  throw std::runtime_error("Band Column '" + cd->columnName +
5758  "' overridden type invalid (must be " +
5759  to_string(sql_type) + ")");
5760  }
5761  }
5762 
5763  // validate metadata column
5764  for (auto const& mci : metadata_column_infos) {
5765  auto const* cd = *cd_itr++;
5766  if (mci.column_descriptor.columnName != cd->columnName) {
5767  throw std::runtime_error("Metadata Column '" + cd->columnName +
5768  "' overridden name invalid (must be '" +
5769  mci.column_descriptor.columnName + "')");
5770  }
5771  auto const cd_type = cd->columnType.get_type();
5772  auto const md_type = mci.column_descriptor.columnType.get_type();
5773  if (cd_type != md_type) {
5774  throw std::runtime_error("Metadata Column '" + cd->columnName +
5775  "' overridden type invalid (must be " +
5776  to_string(md_type) + ")");
5777  }
5778  }
5779 
5780  // import geo table is specifically handled in both DBHandler and QueryRunner
5781  // that is separate path against a normal SQL execution
5782  // so we here explicitly enroll the import session to allow interruption
5783  // while importing geo table
5784  auto query_session = session_info ? session_info->get_session_id() : "";
5785  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5787  auto is_session_already_registered = false;
5788  {
5790  executor->getSessionLock());
5791  is_session_already_registered =
5792  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5793  }
5794  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5795  !is_session_already_registered) {
5796  executor->enrollQuerySession(query_session,
5797  "IMPORT_GEO_TABLE",
5798  query_submitted_time,
5800  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5801  }
5802  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5803  // reset the runtime query interrupt status
5804  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5805  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5806  }
5807  };
5808 
5809  // how many threads are we gonna use?
5811  VLOG(1) << "GDAL import # threads: " << max_threads;
5812 
5814  throw std::runtime_error("Invalid CopyParams.raster_scanlines_per_thread! (" +
5816  ")");
5817  }
5818  const int max_scanlines_per_thread =
5823  VLOG(1) << "Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5824 
5825  // make an import buffer for each thread
5826  CHECK_EQ(import_buffers_vec.size(), 0u);
5828  for (size_t i = 0; i < max_threads; i++) {
5829  for (auto const& cd : loader->get_column_descs()) {
5830  import_buffers_vec[i].emplace_back(
5831  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5832  }
5833  }
5834 
5835  // status and times
5836  using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5837 
5838  // get the band dimensions
5839  auto const band_size_x = raster_importer.getBandsWidth();
5840  auto const band_size_y = raster_importer.getBandsHeight();
5841 
5842  // allocate raw pixel buffers per thread
5843  std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5844  for (size_t i = 0; i < max_threads; i++) {
5845  raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5846  sizeof(double));
5847  }
5848 
5849  // just the sql type of the first point column (if any)
5850  auto const point_sql_type = point_names_and_sql_types.size()
5851  ? point_names_and_sql_types.begin()->second
5852  : kNULLT;
5853 
5854  // lambda for importing to raw data buffers (threadable)
5855  auto import_rows =
5856  [&](const size_t thread_idx, const int y_start, const int y_end) -> ThreadReturn {
5857  // this threads's import buffers
5858  auto& import_buffers = import_buffers_vec[thread_idx];
5859 
5860  // this thread's raw pixel bytes
5861  auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5862 
5863  // clear the buffers
5864  for (auto& col_buffer : import_buffers) {
5865  col_buffer->clear();
5866  }
5867 
5868  // prepare to iterate columns
5869  auto col_itr = column_descs.begin();
5870  int col_idx{0};
5871 
5872  float proj_s{0.0f};
5873  if (point_sql_type != kNULLT) {
5874  // the first two columns (either lon/lat or POINT/coords)
5875  auto const* cd_col0 = *col_itr++;
5876  auto const* cd_col1 = *col_itr++;
5877  auto const* cd_angle =
5878  copy_params.raster_point_compute_angle ? *col_itr++ : nullptr;
5879 
5880  // compute and add x and y
5881  auto proj_timer = timer_start();
5882  for (int y = y_start; y < y_end; y++) {
5883  // get projected pixel coords for this scan-line
5884  auto const coords = raster_importer.getProjectedPixelCoords(thread_idx, y);
5885 
5886  // add to buffers
5887  for (int x = 0; x < band_size_x; x++) {
5888  // this point and angle
5889  auto const& [dx, dy, angle] = coords[x];
5890 
5891  // store the point
5892  switch (point_sql_type) {
5893  case kPOINT: {
5894  // add empty value to POINT buffer
5895  TDatum td_point;
5896  import_buffers[0]->add_value(cd_col0, td_point, false);
5897 
5898  // convert lon/lat to bytes (compressed or not) and add to POINT coords
5899  // buffer
5900  auto const compressed_coords =
5901  Geospatial::compress_coords({dx, dy}, cd_col0->columnType);
5902  std::vector<TDatum> td_coords_data;
5903  for (auto const& cc : compressed_coords) {
5904  TDatum td_byte;
5905  td_byte.val.int_val = cc;
5906  td_coords_data.push_back(td_byte);
5907  }
5908  TDatum td_coords;
5909  td_coords.val.arr_val = td_coords_data;
5910  td_coords.is_null = false;
5911  import_buffers[1]->add_value(cd_col1, td_coords, false);
5912  } break;
5913  case kFLOAT:
5914  case kDOUBLE: {
5915  TDatum td;
5916  td.is_null = false;
5917  td.val.real_val = dx;
5918  import_buffers[0]->add_value(cd_col0, td, false);
5919  td.val.real_val = dy;
5920  import_buffers[1]->add_value(cd_col1, td, false);
5921  } break;
5922  case kSMALLINT:
5923  case kINT: {
5924  TDatum td;
5925  td.is_null = false;
5926  td.val.int_val = static_cast<int64_t>(x);
5927  import_buffers[0]->add_value(cd_col0, td, false);
5928  td.val.int_val = static_cast<int64_t>(y);
5929  import_buffers[1]->add_value(cd_col1, td, false);
5930  } break;
5931  default:
5932  CHECK(false);
5933  }
5934 
5935  // angle?
5937  CHECK(cd_angle);
5938  TDatum td;
5939  td.is_null = false;
5940  td.val.real_val = static_cast<double>(angle);
5941  import_buffers[2]->add_value(cd_angle, td, false);
5942  }
5943  }
5944  }
5945  proj_s = TIMER_STOP(proj_timer);
5946  col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5947  }
5948 
5949  // prepare to accumulate read and conv times
5950  float read_s{0.0f};
5951  float conv_s{0.0f};
5952 
5953  // y_end is one past the actual end, so don't add 1
5954  auto const num_rows = y_end - y_start;
5955  auto const num_elems = band_size_x * num_rows;
5956 
5957  // for each band/column
5958  for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5959  // the corresponding column
5960  auto const* cd_band = *col_itr;
5961  CHECK(cd_band);
5962 
5963  // data type to read as
5964  auto const cd_type = cd_band->columnType.get_type();
5965 
5966  // read the scanlines (will do a data type conversion if necessary)
5967  auto read_timer = timer_start();
5968  raster_importer.getRawPixels(
5969  thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5970  read_s += TIMER_STOP(read_timer);
5971 
5972  // null value?
5973  auto const [null_value, null_value_valid] =
5974  raster_importer.getBandNullValue(band_idx);
5975 
5976  // copy to this thread's import buffers
5977  // convert any nulls we find
5978  auto conv_timer = timer_start();
5979  TDatum td;
5980  switch (cd_type) {
5981  case kSMALLINT: {
5982  const int16_t* values =
5983  reinterpret_cast<const int16_t*>(raw_pixel_bytes.data());
5984  for (int idx = 0; idx < num_elems; idx++) {
5985  auto const& value = values[idx];
5986  if (null_value_valid && value == static_cast<int16_t>(null_value)) {
5987  td.is_null = true;
5988  td.val.int_val = NULL_SMALLINT;
5989  } else {
5990  td.is_null = false;
5991  td.val.int_val = static_cast<int64_t>(value);
5992  }
5993  import_buffers[col_idx]->add_value(cd_band, td, false);
5994  }
5995  } break;
5996  case kINT: {
5997  const int32_t* values =
5998  reinterpret_cast<const int32_t*>(raw_pixel_bytes.data());
5999  for (int idx = 0; idx < num_elems; idx++) {
6000  auto const& value = values[idx];
6001  if (null_value_valid && value == static_cast<int32_t>(null_value)) {
6002  td.is_null = true;
6003  td.val.int_val = NULL_INT;
6004  } else {
6005  td.is_null = false;
6006  td.val.int_val = static_cast<int64_t>(value);
6007  }
6008  import_buffers[col_idx]->add_value(cd_band, td, false);
6009  }
6010  } break;
6011  case kBIGINT: {
6012  const uint32_t* values =
6013  reinterpret_cast<const uint32_t*>(raw_pixel_bytes.data());
6014  for (int idx = 0; idx < num_elems; idx++) {
6015  auto const& value = values[idx];
6016  if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
6017  td.is_null = true;
6018  td.val.int_val = NULL_INT;
6019  } else {
6020  td.is_null = false;
6021  td.val.int_val = static_cast<int64_t>(value);
6022  }
6023  import_buffers[col_idx]->add_value(cd_band, td, false);
6024  }
6025  } break;
6026  case kFLOAT: {
6027  const float* values = reinterpret_cast<const float*>(raw_pixel_bytes.data());
6028  for (int idx = 0; idx < num_elems; idx++) {
6029  auto const& value = values[idx];
6030  if (null_value_valid && value == static_cast<float>(null_value)) {
6031  td.is_null = true;
6032  td.val.real_val = NULL_FLOAT;
6033  } else {
6034  td.is_null = false;
6035  td.val.real_val = static_cast<double>(value);
6036  }
6037  import_buffers[col_idx]->add_value(cd_band, td, false);
6038  }
6039  } break;
6040  case kDOUBLE: {
6041  const double* values = reinterpret_cast<const double*>(raw_pixel_bytes.data());
6042  for (int idx = 0; idx < num_elems; idx++) {
6043  auto const& value = values[idx];
6044  if (null_value_valid && value == null_value) {
6045  td.is_null = true;
6046  td.val.real_val = NULL_DOUBLE;
6047  } else {
6048  td.is_null = false;
6049  td.val.real_val = value;
6050  }
6051  import_buffers[col_idx]->add_value(cd_band, td, false);
6052  }
6053  } break;
6054  default:
6055  CHECK(false);
6056  }
6057  conv_s += TIMER_STOP(conv_timer);
6058 
6059  // next column
6060  col_idx++;
6061  col_itr++;
6062  }
6063 
6064  // metadata columns?
6065  for (auto const& mci : metadata_column_infos) {
6066  auto const* cd_band = *col_itr++;
6067  CHECK(cd_band);
6068  for (int i = 0; i < num_elems; i++) {
6069  import_buffers[col_idx]->add_value(cd_band, mci.value, false, copy_params);
6070  }
6071  col_idx++;
6072  }
6073 
6074  // build status
6075  ImportStatus thread_import_status;
6076  thread_import_status.rows_estimated = num_elems;
6077  thread_import_status.rows_completed = num_elems;
6078 
6079  // done
6080  return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
6081  };
6082 
6083  // prepare to checkpoint the table
6084  auto table_epochs = loader->getTableEpochs();
6085 
6086  // start wall clock
6087  auto wall_timer = timer_start();
6088 
6089  // start the import
6090  raster_importer.import(max_threads);
6091 
6092  // time the phases
6093  float total_proj_s{0.0f};
6094  float total_read_s{0.0f};
6095  float total_conv_s{0.0f};
6096  float total_load_s{0.0f};
6097 
6098  const int min_scanlines_per_thread = 8;
6099  const int max_scanlines_per_block = max_scanlines_per_thread * max_threads;
6100  for (int block_y = 0; block_y < band_size_y;
6101  block_y += (max_threads * max_scanlines_per_thread)) {
6102  using Future = std::future<ThreadReturn>;
6103  std::vector<Future> futures;
6104  const int scanlines_in_block =
6105  std::min(band_size_y - block_y, max_scanlines_per_block);
6106  const int pixels_in_block = scanlines_in_block * band_size_x;
6107  const int block_max_scanlines_per_thread =
6108  std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6109  static_cast<int>(max_threads),
6110  min_scanlines_per_thread);
6111  VLOG(1) << "Raster Importer: scanlines_in_block: " << scanlines_in_block
6112  << ", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6113 
6114  std::vector<size_t> rows_per_thread;
6115  auto block_wall_timer = timer_start();
6116  // run max_threads scanlines at once
6117  for (size_t thread_id = 0; thread_id < max_threads; thread_id++) {
6118  const int y_start = block_y + thread_id * block_max_scanlines_per_thread;
6119  if (y_start < band_size_y) {
6120  const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6121  if (y_start < y_end) {
6122  rows_per_thread.emplace_back((y_end - y_start) * band_size_x);
6123  futures.emplace_back(
6124  std::async(std::launch::async, import_rows, thread_id, y_start, y_end));
6125  }
6126  }
6127  }
6128 
6129  // wait for the threads to finish and
6130  // accumulate the results and times
6131  float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6132  size_t thread_idx = 0;
6133  size_t active_threads = 0;
6134  for (auto& future : futures) {
6135  auto const [import_status, times] = future.get();
6136  import_status_ += import_status;
6137  proj_s += times[0];
6138  read_s += times[1];
6139  conv_s += times[2];
6140  // We load the data in thread order so we can get deterministic row-major raster
6141  // ordering
6142  auto thread_load_timer = timer_start();
6143  // Todo: We should consider invoking the load on another thread in a ping-pong
6144  // fashion so we can simultaneously read the next batch of data
6145  load(import_buffers_vec[thread_idx], rows_per_thread[thread_idx], session_info);
6146  ++thread_idx;
6147  ++active_threads;
6148 
6149  load_s += TIMER_STOP(thread_load_timer);
6150  }
6151 
6152  // average times over all threads (except for load which is single-threaded)
6153  total_proj_s += (proj_s / float(active_threads));
6154  total_read_s += (read_s / float(active_threads));
6155  total_conv_s += (conv_s / float(active_threads));
6156  total_load_s += load_s;
6157 
6158  // update the status
6160 
6161  // more debug
6162  auto const block_wall_s = TIMER_STOP(block_wall_timer);
6163  auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6164  auto const rows_per_second = pixels_in_block / block_wall_s;
6165  LOG(INFO) << "Raster Importer: Loaded " << scanlines_in_block
6166  << " scanlines starting at " << block_y << " out of " << band_size_y
6167  << " in " << block_wall_s << "s at " << scanlines_per_second
6168  << " scanlines/s and " << rows_per_second << " rows/s";
6169 
6170  // check for interrupt
6171  if (UNLIKELY(check_session_interrupted(query_session, executor.get()))) {
6172  import_status_.load_failed = true;
6173  import_status_.load_msg = "Raster Import interrupted";
6175  }
6176  }
6177 
6178  // checkpoint
6179  auto checkpoint_timer = timer_start();
6180  checkpoint(table_epochs);
6181  auto const checkpoint_s = TIMER_STOP(checkpoint_timer);
6182 
6183  // stop wall clock
6184  auto const total_wall_s = TIMER_STOP(wall_timer);
6185 
6186  // report
6187  auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6188  auto const total_rows_per_second =
6189  float(band_size_x) * float(band_size_y) / total_wall_s;
6190  LOG(INFO) << "Raster Importer: Imported "
6191  << static_cast<uint64_t>(band_size_x) * static_cast<uint64_t>(band_size_y)
6192  << " rows";
6193  LOG(INFO) << "Raster Importer: Total Import Time " << total_wall_s << "s at "
6194  << total_scanlines_per_second << " scanlines/s and " << total_rows_per_second
6195  << " rows/s";
6196 
6197  // phase times (with proportions)
6198  auto proj_pct = float(int(total_proj_s / total_wall_s * 1000.0f) * 0.1f);
6199  auto read_pct = float(int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6200  auto conv_pct = float(int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6201  auto load_pct = float(int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6202  auto cpnt_pct = float(int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6203 
6204  VLOG(1) << "Raster Importer: Import timing breakdown:";
6205  VLOG(1) << " Project " << total_proj_s << "s (" << proj_pct << "%)";
6206  VLOG(1) << " Read " << total_read_s << "s (" << read_pct << "%)";
6207  VLOG(1) << " Convert " << total_conv_s << "s (" << conv_pct << "%)";
6208  VLOG(1) << " Load " << total_load_s << "s (" << load_pct << "%)";
6209  VLOG(1) << " Checkpoint " << checkpoint_s << "s (" << cpnt_pct << "%)";
6210 
6211  // all done
6212  return import_status_;
6213 }
#define CHECK_EQ(x, y)
Definition: Logger.h:297
int32_t raster_scanlines_per_thread
Definition: CopyParams.h:90
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: Importer.cpp:125
#define NULL_DOUBLE
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1444
#define NULL_FLOAT
#define LOG(tag)
Definition: Logger.h:283
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3592
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4881
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string raster_import_dimensions
Definition: CopyParams.h:93
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
#define NULL_INT
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
std::string add_metadata_columns
Definition: CopyParams.h:94
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
#define TIMER_STOP(t)
Definition: Importer.cpp:101
future< Result > async(Fn &&fn, Args &&...args)
RasterPointType raster_point_type
Definition: CopyParams.h:88
std::string toString(const ExecutorDeviceType &device_type)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
#define UNLIKELY(x)
Definition: likely.h:25
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4903
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3602
ThreadId thread_id()
Definition: Logger.cpp:870
std::string raster_import_bands
Definition: CopyParams.h:89
#define CHECK(condition)
Definition: Logger.h:289
#define NULL_SMALLINT
Definition: sqltypes.h:60
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
#define VLOG(n)
Definition: Logger.h:383
Type timer_start()
Definition: measure.h:42
static constexpr int kMaxRasterScanlinesPerThread
Definition: Importer.cpp:114
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3592 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), import_export::import_thread_shapefile(), and importGDALRaster().

3594  {
3595  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3597  import_status_.load_failed = true;
3598  import_status_.load_msg = loader->getErrorMessage();
3599  }
3600 }
std::lock_guard< T > lock_guard
heavyai::unique_lock< heavyai::shared_mutex > write_lock
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

Geospatial::GDAL::DataSourceUqPtr import_export::Importer::openGDALDataSource ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4685 of file Importer.cpp.

References Geospatial::GDAL::init(), import_export::kGeoFile, Geospatial::GDAL::openDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), import_export::CopyParams::source_type, and to_string().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptorsGeo(), importGDALGeo(), and readMetadataSampleGDAL().

4687  {
4695  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4696  std::to_string(static_cast<int>(copy_params.source_type)) +
4697  ")");
4698  }
4700 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::string to_string(char const *&&v)
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string s3_session_token
Definition: CopyParams.h:63
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
Definition: GDAL.cpp:181
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4727 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), openGDALDataSource(), and import_export::parse_add_metadata_columns().

Referenced by DBHandler::detect_column_types().

4732  {
4734  openGDALDataSource(file_name, copy_params));
4735  if (datasource == nullptr) {
4736  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4737  file_name);
4738  }
4739 
4740  OGRLayer& layer =
4741  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4742 
4743  auto const* feature_defn = layer.GetLayerDefn();
4744  CHECK(feature_defn);
4745 
4746  // metadata columns?
4747  auto const metadata_column_infos =
4749 
4750  // get limited feature count
4751  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4752  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4753  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4754 
4755  // prepare sample data map
4756  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4757  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4758  CHECK(column_name);
4759  sample_data[column_name] = {};
4760  }
4761  sample_data[geo_column_name] = {};
4762  for (auto const& mci : metadata_column_infos) {
4763  sample_data[mci.column_descriptor.columnName] = {};
4764  }
4765 
4766  // prepare to read
4767  layer.ResetReading();
4768 
4769  // read features (up to limited count)
4770  uint64_t feature_index{0u};
4771  while (feature_index < num_features) {
4772  // get (and take ownership of) feature
4773  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4774  if (!feature) {
4775  break;
4776  }
4777 
4778  // get feature geometry
4779  auto const* geometry = feature->GetGeometryRef();
4780  if (geometry == nullptr) {
4781  break;
4782  }
4783 
4784  // validate geom type (again?)
4785  switch (wkbFlatten(geometry->getGeometryType())) {
4786  case wkbPoint:
4787  case wkbMultiPoint:
4788  case wkbLineString:
4789  case wkbMultiLineString:
4790  case wkbPolygon:
4791  case wkbMultiPolygon:
4792  break;
4793  default:
4794  throw std::runtime_error("Unsupported geometry type: " +
4795  std::string(geometry->getGeometryName()));
4796  }
4797 
4798  // populate sample data for regular field columns
4799  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4800  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4801  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4802  }
4803 
4804  // populate sample data for metadata columns?
4805  for (auto const& mci : metadata_column_infos) {
4806  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4807  }
4808 
4809  // populate sample data for geo column with WKT string
4810  char* wkts = nullptr;
4811  geometry->exportToWkt(&wkts);
4812  CHECK(wkts);
4813  sample_data[geo_column_name].push_back(wkts);
4814  CPLFree(wkts);
4815 
4816  // next feature
4817  feature_index++;
4818  }
4819 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4704
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
std::string geo_layer_name
Definition: CopyParams.h:81
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4685
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const bool  force_null = false 
)
static

Definition at line 1627 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), foreign_storage::TextFileBufferParser::processGeoColumn(), and foreign_storage::TextFileBufferParser::processInvalidGeoColumn().

1637  {
1638  const auto col_ti = cd->columnType;
1639  const auto col_type = col_ti.get_type();
1640  auto columnId = cd->columnId;
1641  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  bool is_null_geo = false;
1643  bool is_null_point = false;
1644  if (!col_ti.get_notnull()) {
1645  // Check for NULL geo
1646  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1647  is_null_point = true;
1648  coords.clear();
1649  }
1650  is_null_geo = coords.empty();
1651  if (is_null_point) {
1652  coords.push_back(NULL_ARRAY_DOUBLE);
1653  coords.push_back(NULL_DOUBLE);
1654  // Treating POINT coords as notnull, need to store actual encoding
1655  // [un]compressed+[not]null
1656  is_null_geo = false;
1657  }
1658  }
1659  if (force_null) {
1660  is_null_geo = true;
1661  }
1662  TDatum tdd_coords;
1663  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1664  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1665  // in a fixlen array, compressed and uncompressed.
1666  if (!is_null_geo) {
1667  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1668  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1669  for (auto cc : compressed_coords) {
1670  tdd_coords.val.arr_val.emplace_back();
1671  tdd_coords.val.arr_val.back().val.int_val = cc;
1672  }
1673  }
1674  tdd_coords.is_null = is_null_geo;
1675  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1676 
1677  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1678  // Create [linest]ring_sizes array value and add it to the physical column
1679  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1680  TDatum tdd_ring_sizes;
1681  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1682  if (!is_null_geo) {
1683  for (auto ring_size : ring_sizes) {
1684  tdd_ring_sizes.val.arr_val.emplace_back();
1685  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1686  }
1687  }
1688  tdd_ring_sizes.is_null = is_null_geo;
1689  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1690  }
1691 
1692  if (col_type == kMULTIPOLYGON) {
1693  // Create poly_rings array value and add it to the physical column
1694  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1695  TDatum tdd_poly_rings;
1696  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1697  if (!is_null_geo) {
1698  for (auto num_rings : poly_rings) {
1699  tdd_poly_rings.val.arr_val.emplace_back();
1700  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1701  }
1702  }
1703  tdd_poly_rings.is_null = is_null_geo;
1704  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1705  }
1706 
1707  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1708  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1709  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1710  TDatum tdd_bounds;
1711  tdd_bounds.val.arr_val.reserve(bounds.size());
1712  if (!is_null_geo) {
1713  for (auto b : bounds) {
1714  tdd_bounds.val.arr_val.emplace_back();
1715  tdd_bounds.val.arr_val.back().val.real_val = b;
1716  }
1717  }
1718  tdd_bounds.is_null = is_null_geo;
1719  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1720  }
1721 
1722  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1723  // Create render_group value and add it to the physical column
1724  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1725  TDatum td_render_group;
1726  td_render_group.val.int_val = render_group;
1727  td_render_group.is_null = is_null_geo;
1728  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1729  }
1730 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:380
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
std::vector< int > &  render_groups_column 
)
static

Definition at line 1732 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1741  {
1742  const auto col_ti = cd->columnType;
1743  const auto col_type = col_ti.get_type();
1744  auto columnId = cd->columnId;
1745 
1746  auto coords_row_count = coords_column.size();
1747  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1748  for (auto& coords : coords_column) {
1749  bool is_null_geo = false;
1750  bool is_null_point = false;
1751  if (!col_ti.get_notnull()) {
1752  // Check for NULL geo
1753  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1754  is_null_point = true;
1755  coords.clear();
1756  }
1757  is_null_geo = coords.empty();
1758  if (is_null_point) {
1759  coords.push_back(NULL_ARRAY_DOUBLE);
1760  coords.push_back(NULL_DOUBLE);
1761  // Treating POINT coords as notnull, need to store actual encoding
1762  // [un]compressed+[not]null
1763  is_null_geo = false;
1764  }
1765  }
1766  std::vector<TDatum> td_coords_data;
1767  if (!is_null_geo) {
1768  std::vector<uint8_t> compressed_coords =
1769  Geospatial::compress_coords(coords, col_ti);
1770  for (auto const& cc : compressed_coords) {
1771  TDatum td_byte;
1772  td_byte.val.int_val = cc;
1773  td_coords_data.push_back(td_byte);
1774  }
1775  }
1776  TDatum tdd_coords;
1777  tdd_coords.val.arr_val = td_coords_data;
1778  tdd_coords.is_null = is_null_geo;
1779  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1780  }
1781  col_idx++;
1782 
1783  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1784  if (ring_sizes_column.size() != coords_row_count) {
1785  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1786  }
1787  // Create [linest[ring_sizes array value and add it to the physical column
1788  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1789  for (auto const& ring_sizes : ring_sizes_column) {
1790  bool is_null_geo = false;
1791  if (!col_ti.get_notnull()) {
1792  // Check for NULL geo
1793  is_null_geo = ring_sizes.empty();
1794  }
1795  std::vector<TDatum> td_ring_sizes;
1796  for (auto const& ring_size : ring_sizes) {
1797  TDatum td_ring_size;
1798  td_ring_size.val.int_val = ring_size;
1799  td_ring_sizes.push_back(td_ring_size);
1800  }
1801  TDatum tdd_ring_sizes;
1802  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1803  tdd_ring_sizes.is_null = is_null_geo;
1804  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1805  }
1806  col_idx++;
1807  }
1808 
1809  if (col_type == kMULTIPOLYGON) {
1810  if (poly_rings_column.size() != coords_row_count) {
1811  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1812  }
1813  // Create poly_rings array value and add it to the physical column
1814  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1815  for (auto const& poly_rings : poly_rings_column) {
1816  bool is_null_geo = false;
1817  if (!col_ti.get_notnull()) {
1818  // Check for NULL geo
1819  is_null_geo = poly_rings.empty();
1820  }
1821  std::vector<TDatum> td_poly_rings;
1822  for (auto const& num_rings : poly_rings) {
1823  TDatum td_num_rings;
1824  td_num_rings.val.int_val = num_rings;
1825  td_poly_rings.push_back(td_num_rings);
1826  }
1827  TDatum tdd_poly_rings;
1828  tdd_poly_rings.val.arr_val = td_poly_rings;
1829  tdd_poly_rings.is_null = is_null_geo;
1830  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1831  }
1832  col_idx++;
1833  }
1834 
1835  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1836  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1837  if (bounds_column.size() != coords_row_count) {
1838  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1839  }
1840  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1841  for (auto const& bounds : bounds_column) {
1842  bool is_null_geo = false;
1843  if (!col_ti.get_notnull()) {
1844  // Check for NULL geo
1845  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1846  }
1847  std::vector<TDatum> td_bounds_data;
1848  for (auto const& b : bounds) {
1849  TDatum td_double;
1850  td_double.val.real_val = b;
1851  td_bounds_data.push_back(td_double);
1852  }
1853  TDatum tdd_bounds;
1854  tdd_bounds.val.arr_val = td_bounds_data;
1855  tdd_bounds.is_null = is_null_geo;
1856  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1857  }
1858  col_idx++;
1859  }
1860 
1861  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1862  // Create render_group value and add it to the physical column
1863  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1864  for (auto const& render_group : render_groups_column) {
1865  TDatum td_render_group;
1866  td_render_group.val.int_val = render_group;
1867  td_render_group.is_null = false;
1868  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1869  }
1870  col_idx++;
1871  }
1872 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:380
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:289
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 239 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), importGDALGeo(), importGDALRaster(), import_export::ForeignDataImporter::importGeneralS3(), and anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers().

239  {
241  is.end = std::chrono::steady_clock::now();
242  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
244 }
std::lock_guard< T > lock_guard
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:166
heavyai::unique_lock< heavyai::shared_mutex > write_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:165
std::string import_id
Definition: Importer.h:887

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 890 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 888 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 894 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 893 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 889 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDALGeo(), and importGDALRaster().


The documentation for this class was generated from the following files: