OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
 

Private Member Functions

ImportStatus importGDALGeo (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGDALRaster (const Catalog_Namespace::SessionInfo *session_info)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static
Geospatial::GDAL::DataSourceUqPtr 
openGDALDataSource (const std::string &fileName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsGeo (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsRaster (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 778 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 831 of file Importer.h.

831 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 171 of file Importer.cpp.

175  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:171
constexpr double f
Definition: Utm.h:31
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 177 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
constexpr double f
Definition: Utm.h:31
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:893
std::string import_id
Definition: Importer.h:887
std::unique_ptr< Loader > loader
Definition: Importer.h:892
const std::string file_path
Definition: Importer.h:712
import_export::Importer::~Importer ( )
override

Definition at line 218 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

218  {
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3641 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), importGDALGeo(), and importGDALRaster().

3642  {
3643  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3646  // rollback to starting epoch - undo all the added records
3647  loader->setTableEpochs(table_epochs);
3648  } else {
3649  loader->checkpoint();
3650  }
3651  }
3652 
3653  if (loader->getTableDesc()->persistenceLevel ==
3654  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3655  // tables
3656  auto ms = measure<>::execution([&]() {
3658  if (!import_status_.load_failed) {
3659  for (auto& p : import_buffers_vec[0]) {
3660  if (!p->stringDictCheckpoint()) {
3661  LOG(ERROR) << "Checkpointing Dictionary for Column "
3662  << p->getColumnDesc()->columnName << " failed.";
3663  import_status_.load_failed = true;
3664  import_status_.load_msg = "Dictionary checkpoint failed";
3665  break;
3666  }
3667  }
3668  }
3669  });
3670  if (DEBUG_TIMING) {
3671  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3672  << std::endl;
3673  }
3674  }
3675 }
std::lock_guard< T > lock_guard
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:216
heavyai::unique_lock< heavyai::shared_mutex > write_lock
#define DEBUG_TIMING
Definition: Importer.cpp:157
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
static constexpr char const * FOREIGN_TABLE
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5182 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5182  {
5183  return gdalStatInternal(path, copy_params, false);
5184 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5147

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5187 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5188  {
5189  return gdalStatInternal(path, copy_params, true);
5190 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5147

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5259 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5261  {
5262  // lazy init GDAL
5269 
5270  // prepare to gather files
5271  std::vector<std::string> files;
5272 
5273  // gather the files recursively
5274  gdalGatherFilesInArchiveRecursive(archive_path, files);
5275 
5276  // convert to relative paths inside archive
5277  for (auto& file : files) {
5278  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5279  }
5280 
5281  // done
5282  return files;
5283 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:5192
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5286 of file Importer.cpp.

References CHECK, EMPTY, GEO, import_export::CopyParams::geo_explode_collections, Geospatial::GDAL::init(), NON_GEO, openGDALDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5288  {
5289  // lazy init GDAL
5296 
5297  // prepare to gather layer info
5298  std::vector<GeoFileLayerInfo> layer_info;
5299 
5300  // open the data set
5302  if (poDS == nullptr) {
5303  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5304  file_name);
5305  }
5306 
5307  // enumerate the layers
5308  for (auto&& poLayer : poDS->GetLayers()) {
5310  // prepare to read this layer
5311  poLayer->ResetReading();
5312  // skip layer if empty
5313  if (poLayer->GetFeatureCount() > 0) {
5314  // get first feature
5315  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5316  CHECK(first_feature);
5317  // check feature for geometry
5318  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5319  if (!geometry) {
5320  // layer has no geometry
5321  contents = GeoFileLayerContents::NON_GEO;
5322  } else {
5323  // check the geometry type
5324  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5325  switch (wkbFlatten(geometry_type)) {
5326  case wkbPoint:
5327  case wkbLineString:
5328  case wkbMultiLineString:
5329  case wkbPolygon:
5330  case wkbMultiPolygon:
5331  // layer has supported geo
5332  contents = GeoFileLayerContents::GEO;
5333  break;
5334  case wkbMultiPoint:
5335  // supported if geo_explode_collections is specified
5339  break;
5340  default:
5341  // layer has unsupported geometry
5343  break;
5344  }
5345  }
5346  }
5347  // store info for this layer
5348  layer_info.emplace_back(poLayer->GetName(), contents);
5349  }
5350 
5351  // done
5352  return layer_info;
5353 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4724
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:222
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 5147 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

5149  {
5150  // lazy init GDAL
5157 
5158 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5159  // clear GDAL stat cache
5160  // without this, file existence will be cached, even if authentication changes
5161  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5162  VSICurlClearCache();
5163 #endif
5164 
5165  // stat path
5166  VSIStatBufL sb;
5167  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5168  if (result < 0) {
5169  return false;
5170  }
5171 
5172  // exists?
5173  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5174  return true;
5175  } else if (VSI_ISREG(sb.st_mode)) {
5176  return true;
5177  }
5178  return false;
5179 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const bool  is_raster,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4961 of file Importer.cpp.

References gdalToColumnDescriptorsGeo(), and gdalToColumnDescriptorsRaster().

Referenced by DBHandler::detect_column_types().

4965  {
4966  if (is_raster) {
4967  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4968  }
4969  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4970 }
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:5045
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4973

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsGeo ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 5045 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, import_export::anonymous_namespace{Importer.cpp}::ogr_to_type(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

5048  {
5049  std::list<ColumnDescriptor> cds;
5050 
5052  if (poDS == nullptr) {
5053  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5054  file_name);
5055  }
5056  if (poDS->GetLayerCount() == 0) {
5057  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5058  " has no layers");
5059  }
5060 
5061  OGRLayer& layer =
5063 
5064  layer.ResetReading();
5065  // TODO(andrewseidl): support multiple features
5066  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5067  if (poFeature == nullptr) {
5068  throw std::runtime_error("No features found in " + file_name);
5069  }
5070  // get fields as regular columns
5071  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5072  CHECK(poFDefn);
5073  int iField;
5074  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5075  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5076  auto typePair = ogr_to_type(poFieldDefn->GetType());
5077  ColumnDescriptor cd;
5078  cd.columnName = poFieldDefn->GetNameRef();
5079  cd.sourceName = poFieldDefn->GetNameRef();
5080  SQLTypeInfo ti;
5081  if (typePair.second) {
5082  ti.set_type(kARRAY);
5083  ti.set_subtype(typePair.first);
5084  } else {
5085  ti.set_type(typePair.first);
5086  }
5087  if (typePair.first == kTEXT) {
5089  ti.set_comp_param(32);
5090  }
5091  ti.set_fixed_size();
5092  cd.columnType = ti;
5093  cds.push_back(cd);
5094  }
5095  // get geo column, if any
5096  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
5097  if (poGeometry) {
5098  ColumnDescriptor cd;
5099  cd.columnName = geo_column_name;
5100  cd.sourceName = geo_column_name;
5101 
5102  // get GDAL type
5103  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
5104 
5105  // if exploding, override any collection type to child type
5107  if (ogr_type == wkbMultiPolygon) {
5108  ogr_type = wkbPolygon;
5109  } else if (ogr_type == wkbMultiLineString) {
5110  ogr_type = wkbLineString;
5111  } else if (ogr_type == wkbMultiPoint) {
5112  ogr_type = wkbPoint;
5113  }
5114  }
5115 
5116  // convert to internal type
5117  SQLTypes geoType = ogr_to_type(ogr_type);
5118 
5119  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5121  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5122  }
5123 
5124  // build full internal type
5125  SQLTypeInfo ti;
5126  ti.set_type(geoType);
5132  cd.columnType = ti;
5133 
5134  cds.push_back(cd);
5135  }
5136 
5137  // metadata columns?
5138  auto metadata_column_infos =
5140  for (auto& mci : metadata_column_infos) {
5141  cds.push_back(std::move(mci.column_descriptor));
5142  }
5143 
5144  return cds;
5145 }
void set_compression(EncodingType c)
Definition: sqltypes.h:525
SQLTypes
Definition: sqltypes.h:52
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:515
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4743
std::string sourceName
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:518
void set_fixed_size()
Definition: sqltypes.h:523
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4862
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:163
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:520
void set_comp_param(int p)
Definition: sqltypes.h:526
std::string geo_layer_name
Definition: CopyParams.h:81
Definition: sqltypes.h:66
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4724
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:222
SQLTypeInfo columnType
std::string columnName
EncodingType geo_coords_encoding
Definition: CopyParams.h:76
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:514

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsRaster ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4973 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getPointNamesAndSQLTypes(), Geospatial::GDAL::init(), kENCODING_GEOINT, kGEOMETRY, kPOINT, import_export::parse_add_metadata_columns(), import_export::CopyParams::raster_import_bands, import_export::CopyParams::raster_import_dimensions, import_export::CopyParams::raster_point_compute_angle, import_export::CopyParams::raster_point_transform, import_export::CopyParams::raster_point_type, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Geospatial::GDAL::setAuthorizationTokens(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4976  {
4977  // lazy init GDAL
4984 
4985  // prepare for metadata column
4986  auto metadata_column_infos =
4988 
4989  // create a raster importer and do the detect
4990  RasterImporter raster_importer;
4991  raster_importer.detect(
4992  file_name,
4998  false,
4999  metadata_column_infos);
5000 
5001  // prepare to capture column descriptors
5002  std::list<ColumnDescriptor> cds;
5003 
5004  // get the point column info
5005  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5006 
5007  // create the columns for the point in the specified type
5008  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5009  ColumnDescriptor cd;
5010  cd.columnName = cd.sourceName = col_name;
5011  cd.columnType.set_type(sql_type);
5012  // hardwire other POINT attributes for now
5013  if (sql_type == kPOINT) {
5015  cd.columnType.set_input_srid(4326);
5016  cd.columnType.set_output_srid(4326);
5018  cd.columnType.set_comp_param(32);
5019  }
5020  cds.push_back(cd);
5021  }
5022 
5023  // get the names and types for the band column(s)
5024  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5025 
5026  // add column descriptors for each band
5027  for (auto const& [band_name, sql_type] : band_names_and_types) {
5028  ColumnDescriptor cd;
5029  cd.columnName = cd.sourceName = band_name;
5030  cd.columnType.set_type(sql_type);
5032  cds.push_back(cd);
5033  }
5034 
5035  // metadata columns?
5036  for (auto& mci : metadata_column_infos) {
5037  cds.push_back(std::move(mci.column_descriptor));
5038  }
5039 
5040  // return the results
5041  return cds;
5042 }
void set_compression(EncodingType c)
Definition: sqltypes.h:525
std::string s3_secret_key
Definition: CopyParams.h:62
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4920
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:515
static void init()
Definition: GDAL.cpp:67
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string sourceName
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:518
RasterPointType raster_point_type
Definition: CopyParams.h:88
void set_fixed_size()
Definition: sqltypes.h:523
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:520
void set_comp_param(int p)
Definition: sqltypes.h:526
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4942
std::string s3_session_token
Definition: CopyParams.h:63
std::string raster_import_bands
Definition: CopyParams.h:89
SQLTypeInfo columnType
std::string s3_access_key
Definition: CopyParams.h:61
std::string columnName
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:514

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 795 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

795  {
796  return loader->get_column_descs();
797  }
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 794 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

794 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 804 of file Importer.h.

References import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

804  {
805  return import_buffers_vec[i];
806  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 801 of file Importer.h.

References import_buffers_vec.

801  {
802  return import_buffers_vec;
803  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 230 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

230  {
232  return import_status_map.at(import_id);
233 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:166
std::shared_lock< T > shared_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:165
std::string import_id
Definition: Importer.h:887

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 807 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

807 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:893

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 841 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

841 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 864 of file Importer.h.

References loader.

864 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:892
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4478 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4478  {
4479  return DataStreamSink::archivePlumber(session_info);
4480 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3677

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4482 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), heavyai::fopen(), g_enable_assign_render_groups, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4485  {
4487  auto query_session = session_info ? session_info->get_session_id() : "";
4488 
4489  if (!p_file) {
4490  p_file = fopen(file_path.c_str(), "rb");
4491  }
4492  if (!p_file) {
4493  throw std::runtime_error("failed to open file '" + file_path +
4494  "': " + strerror(errno));
4495  }
4496 
4497  if (!decompressed) {
4498  (void)fseek(p_file, 0, SEEK_END);
4499  file_size = ftell(p_file);
4500  }
4501 
4503  VLOG(1) << "Delimited import # threads: " << max_threads;
4504 
4505  // deal with small files
4506  size_t alloc_size = copy_params.buffer_size;
4507  if (!decompressed && file_size < alloc_size) {
4508  alloc_size = file_size;
4509  }
4510 
4511  for (size_t i = 0; i < max_threads; i++) {
4512  import_buffers_vec.emplace_back();
4513  for (const auto cd : loader->get_column_descs()) {
4514  import_buffers_vec[i].emplace_back(
4515  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4516  }
4517  }
4518 
4519  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4520  size_t current_pos = 0;
4521  size_t end_pos;
4522  size_t begin_pos = 0;
4523 
4524  (void)fseek(p_file, current_pos, SEEK_SET);
4525  size_t size =
4526  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4527 
4528  // make render group analyzers for each poly column
4529  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4532  auto& cat = loader->getCatalog();
4533  auto* td = loader->getTableDesc();
4534  CHECK(td);
4535  auto column_descriptors =
4536  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4537  for (auto const& cd : column_descriptors) {
4538  if (IS_GEO_POLY(cd->columnType.get_type())) {
4539  auto rga = std::make_shared<RenderGroupAnalyzer>();
4540  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4541  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4542  }
4543  }
4544  } else {
4545  fclose(p_file);
4546  throw std::runtime_error(
4547  "Render Group Assignment requested in CopyParams but disabled in Server "
4548  "Config. Set enable_assign_render_groups=true in Server Config to override.");
4549  }
4550  }
4551 
4552  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4553  loader->getTableDesc()->tableId};
4554  auto table_epochs = loader->getTableEpochs();
4556  {
4557  std::list<std::future<ImportStatus>> threads;
4558 
4559  // use a stack to track thread_ids which must not overlap among threads
4560  // because thread_id is used to index import_buffers_vec[]
4561  std::stack<size_t> stack_thread_ids;
4562  for (size_t i = 0; i < max_threads; i++) {
4563  stack_thread_ids.push(i);
4564  }
4565  // added for true row index on error
4566  size_t first_row_index_this_buffer = 0;
4567 
4568  while (size > 0) {
4569  unsigned int num_rows_this_buffer = 0;
4570  CHECK(scratch_buffer);
4571  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4572  scratch_buffer,
4573  size,
4574  copy_params,
4575  first_row_index_this_buffer,
4576  num_rows_this_buffer,
4577  p_file);
4578 
4579  // unput residual
4580  int nresidual = size - end_pos;
4581  std::unique_ptr<char[]> unbuf;
4582  if (nresidual > 0) {
4583  unbuf = std::make_unique<char[]>(nresidual);
4584  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4585  }
4586 
4587  // get a thread_id not in use
4588  auto thread_id = stack_thread_ids.top();
4589  stack_thread_ids.pop();
4590  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4591 
4592  threads.push_back(std::async(std::launch::async,
4594  thread_id,
4595  this,
4596  std::move(scratch_buffer),
4597  begin_pos,
4598  end_pos,
4599  end_pos,
4600  columnIdToRenderGroupAnalyzerMap,
4601  first_row_index_this_buffer,
4602  session_info,
4603  executor));
4604 
4605  first_row_index_this_buffer += num_rows_this_buffer;
4606 
4607  current_pos += end_pos;
4608  scratch_buffer = std::make_unique<char[]>(alloc_size);
4609  CHECK(scratch_buffer);
4610  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4611  size = nresidual +
4612  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4613 
4614  begin_pos = 0;
4615  while (threads.size() > 0) {
4616  int nready = 0;
4617  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4618  it != threads.end();) {
4619  auto& p = *it;
4620  std::chrono::milliseconds span(0);
4621  if (p.wait_for(span) == std::future_status::ready) {
4622  auto ret_import_status = p.get();
4623  {
4625  import_status_ += ret_import_status;
4626  if (ret_import_status.load_failed) {
4628  }
4629  }
4630  // sum up current total file offsets
4631  size_t total_file_offset{0};
4632  if (decompressed) {
4633  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4634  for (const auto file_offset : file_offsets) {
4635  total_file_offset += file_offset;
4636  }
4637  }
4638  // estimate number of rows per current total file offset
4639  if (decompressed ? total_file_offset : current_pos) {
4641  (decompressed ? (float)total_file_size / total_file_offset
4642  : (float)file_size / current_pos) *
4643  import_status_.rows_completed;
4644  }
4645  VLOG(3) << "rows_completed " << import_status_.rows_completed
4646  << ", rows_estimated " << import_status_.rows_estimated
4647  << ", total_file_size " << total_file_size << ", total_file_offset "
4648  << total_file_offset;
4650  // recall thread_id for reuse
4651  stack_thread_ids.push(ret_import_status.thread_id);
4652  threads.erase(it++);
4653  ++nready;
4654  } else {
4655  ++it;
4656  }
4657  }
4658 
4659  if (nready == 0) {
4660  std::this_thread::yield();
4661  }
4662 
4663  // on eof, wait all threads to finish
4664  if (0 == size) {
4665  continue;
4666  }
4667 
4668  // keep reading if any free thread slot
4669  // this is one of the major difference from old threading model !!
4670  if (threads.size() < max_threads) {
4671  break;
4672  }
4675  break;
4676  }
4677  }
4680  import_status_.load_failed = true;
4681  // todo use better message
4682  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4683  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4684  break;
4685  }
4687  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4688  break;
4689  }
4690  }
4691 
4692  // join dangling threads in case of LOG(ERROR) above
4693  for (auto& p : threads) {
4694  p.wait();
4695  }
4696  }
4697 
4698  checkpoint(table_epochs);
4699 
4700  fclose(p_file);
4701  p_file = nullptr;
4702  return import_status_;
4703 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::string cat(Ts &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:2039
heavyai::shared_lock< heavyai::shared_mutex > read_lock
#define LOG(tag)
Definition: Logger.h:216
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3641
ThreadId thread_id()
Definition: Logger.cpp:820
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:154
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
std::vector< size_t > file_offsets
Definition: Importer.h:717
#define VLOG(n)
Definition: Logger.h:316
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712
#define IS_GEO_POLY(T)
Definition: sqltypes.h:328

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info,
const bool  is_raster 
)

Definition at line 5355 of file Importer.cpp.

References importGDALGeo(), and importGDALRaster().

Referenced by QueryRunner::ImportDriver::importGeoTable().

5358  {
5359  if (is_raster) {
5360  return importGDALRaster(session_info);
5361  }
5362  return importGDALGeo(columnNameToSourceNameMap, session_info);
5363 }
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5365
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5678

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALGeo ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 5365 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, logger::ERROR, g_enable_assign_render_groups, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), TableDescriptor::tableId, TableDescriptor::tableName, logger::thread_id(), toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by importGDAL().

5367  {
5368  // initial status
5371  if (poDS == nullptr) {
5372  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5373  file_path);
5374  }
5375 
5376  OGRLayer& layer =
5378 
5379  // get the number of features in this layer
5380  size_t numFeatures = layer.GetFeatureCount();
5381 
5382  // build map of metadata field (additional columns) name to index
5383  // use shared_ptr since we need to pass it to the worker
5384  FieldNameToIndexMapType fieldNameToIndexMap;
5385  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5386  CHECK(poFDefn);
5387  size_t numFields = poFDefn->GetFieldCount();
5388  for (size_t iField = 0; iField < numFields; iField++) {
5389  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5390  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5391  }
5392 
5393  // the geographic spatial reference we want to put everything in
5394  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5395  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5396 
5397 #if GDAL_VERSION_MAJOR >= 3
5398  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5399  // this results in X and Y being transposed for angle-based
5400  // coordinate systems. This restores the previous behavior.
5401  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5402 #endif
5403 
5404 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5405  // just one "thread"
5406  max_threads = 1;
5407 #else
5408  // how many threads to use
5410 #endif
5411  VLOG(1) << "GDAL import # threads: " << max_threads;
5412 
5413  // metadata columns?
5414  auto const metadata_column_infos =
5416 
5417  // import geo table is specifically handled in both DBHandler and QueryRunner
5418  // that is separate path against a normal SQL execution
5419  // so we here explicitly enroll the import session to allow interruption
5420  // while importing geo table
5421  auto query_session = session_info ? session_info->get_session_id() : "";
5422  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5424  auto is_session_already_registered = false;
5425  {
5427  executor->getSessionLock());
5428  is_session_already_registered =
5429  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5430  }
5431  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5432  !is_session_already_registered) {
5433  executor->enrollQuerySession(query_session,
5434  "IMPORT_GEO_TABLE",
5435  query_submitted_time,
5437  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5438  }
5439  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5440  // reset the runtime query interrupt status
5441  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5442  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5443  }
5444  };
5445 
5446  // make an import buffer for each thread
5447  CHECK_EQ(import_buffers_vec.size(), 0u);
5449  for (size_t i = 0; i < max_threads; i++) {
5450  for (const auto cd : loader->get_column_descs()) {
5451  import_buffers_vec[i].emplace_back(
5452  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5453  }
5454  }
5455 
5456  // make render group analyzers for each poly column
5457  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5460  auto& cat = loader->getCatalog();
5461  auto* td = loader->getTableDesc();
5462  CHECK(td);
5463  auto column_descriptors =
5464  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5465  for (auto const& cd : column_descriptors) {
5466  if (IS_GEO_POLY(cd->columnType.get_type())) {
5467  auto rga = std::make_shared<RenderGroupAnalyzer>();
5468  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5469  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5470  }
5471  }
5472  } else {
5473  throw std::runtime_error(
5474  "Render Group Assignment requested in CopyParams but disabled in Server "
5475  "Config. Set enable_assign_render_groups=true in Server Config to override.");
5476  }
5477  }
5478 
5479 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5480  // threads
5481  std::list<std::future<ImportStatus>> threads;
5482 
5483  // use a stack to track thread_ids which must not overlap among threads
5484  // because thread_id is used to index import_buffers_vec[]
5485  std::stack<size_t> stack_thread_ids;
5486  for (size_t i = 0; i < max_threads; i++) {
5487  stack_thread_ids.push(i);
5488  }
5489 #endif
5490 
5491  // checkpoint the table
5492  auto table_epochs = loader->getTableEpochs();
5493 
5494  // reset the layer
5495  layer.ResetReading();
5496 
5497  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5498 
5499  // make a features buffer for each thread
5500  std::vector<FeaturePtrVector> features(max_threads);
5501 
5502  // make one of these for each thread, based on the first feature's SR
5503  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5504  max_threads);
5505 
5506  // for each feature...
5507  size_t firstFeatureThisChunk = 0;
5508  while (firstFeatureThisChunk < numFeatures) {
5509  // how many features this chunk
5510  size_t numFeaturesThisChunk =
5511  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5512 
5513 // get a thread_id not in use
5514 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5515  size_t thread_id = 0;
5516 #else
5517  auto thread_id = stack_thread_ids.top();
5518  stack_thread_ids.pop();
5519  CHECK(thread_id < max_threads);
5520 #endif
5521 
5522  // fill features buffer for new thread
5523  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5524  features[thread_id].emplace_back(layer.GetNextFeature());
5525  }
5526 
5527  // construct a coordinate transformation for each thread, if needed
5528  // some features may not have geometry, so look for the first one that does
5529  if (coordinate_transformations[thread_id] == nullptr) {
5530  for (auto const& feature : features[thread_id]) {
5531  auto const* geometry = feature->GetGeometryRef();
5532  if (geometry) {
5533  auto const* geometry_sr = geometry->getSpatialReference();
5534  // if the SR is non-null and non-empty and different from what we want
5535  // we need to make a reusable CoordinateTransformation
5536  if (geometry_sr &&
5537 #if GDAL_VERSION_MAJOR >= 3
5538  !geometry_sr->IsEmpty() &&
5539 #endif
5540  !geometry_sr->IsSame(poGeographicSR.get())) {
5541  // validate the SR before trying to use it
5542  if (geometry_sr->Validate() != OGRERR_NONE) {
5543  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5544  }
5545  // create the OGRCoordinateTransformation that will be used for
5546  // all the features in this chunk
5547  coordinate_transformations[thread_id].reset(
5548  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5549  if (coordinate_transformations[thread_id] == nullptr) {
5550  throw std::runtime_error(
5551  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5552  }
5553  }
5554  // once we find at least one geometry with an SR, we're done
5555  break;
5556  }
5557  }
5558  }
5559 
5560 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5561  // call worker function directly
5562  auto ret_import_status =
5564  this,
5565  coordinate_transformations[thread_id].get(),
5566  std::move(features[thread_id]),
5567  firstFeatureThisChunk,
5568  numFeaturesThisChunk,
5569  fieldNameToIndexMap,
5570  columnNameToSourceNameMap,
5571  columnIdToRenderGroupAnalyzerMap,
5572  session_info,
5573  executor.get(),
5574  metadata_column_infos);
5575  import_status += ret_import_status;
5576  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5577  import_status.rows_completed;
5578  set_import_status(import_id, import_status);
5579 #else
5580  // fire up that thread to import this geometry
5581  threads.push_back(std::async(std::launch::async,
5583  thread_id,
5584  this,
5585  coordinate_transformations[thread_id].get(),
5586  std::move(features[thread_id]),
5587  firstFeatureThisChunk,
5588  numFeaturesThisChunk,
5589  fieldNameToIndexMap,
5590  columnNameToSourceNameMap,
5591  columnIdToRenderGroupAnalyzerMap,
5592  session_info,
5593  executor.get(),
5594  metadata_column_infos));
5595 
5596  // let the threads run
5597  while (threads.size() > 0) {
5598  int nready = 0;
5599  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5600  it != threads.end();) {
5601  auto& p = *it;
5602  std::chrono::milliseconds span(
5603  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5604  if (p.wait_for(span) == std::future_status::ready) {
5605  auto ret_import_status = p.get();
5606  {
5608  import_status_ += ret_import_status;
5610  ((float)firstFeatureThisChunk / (float)numFeatures) *
5614  break;
5615  }
5616  }
5617  // recall thread_id for reuse
5618  stack_thread_ids.push(ret_import_status.thread_id);
5619 
5620  threads.erase(it++);
5621  ++nready;
5622  } else {
5623  ++it;
5624  }
5625  }
5626 
5627  if (nready == 0) {
5628  std::this_thread::yield();
5629  }
5630 
5631  // keep reading if any free thread slot
5632  // this is one of the major difference from old threading model !!
5633  if (threads.size() < max_threads) {
5634  break;
5635  }
5636  }
5637 #endif
5638 
5639  // out of rows?
5640 
5643  import_status_.load_failed = true;
5644  // todo use better message
5645  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5646  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5647  break;
5648  }
5650  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5651  "more details";
5652  break;
5653  }
5654 
5655  firstFeatureThisChunk += numFeaturesThisChunk;
5656  }
5657 
5658 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5659  // wait for any remaining threads
5660  if (threads.size()) {
5661  for (auto& p : threads) {
5662  // wait for the thread
5663  p.wait();
5664  // get the result and update the final import status
5665  auto ret_import_status = p.get();
5666  import_status_ += ret_import_status;
5669  }
5670  }
5671 #endif
5672 
5673  checkpoint(table_epochs);
5674 
5675  return import_status_;
5676 }
std::lock_guard< T > lock_guard
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:151
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::string cat(Ts &&...args)
#define LOG(tag)
Definition: Logger.h:216
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4743
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
std::string add_metadata_columns
Definition: CopyParams.h:94
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
Definition: Importer.cpp:2417
std::string get_session_id() const
Definition: SessionInfo.h:93
std::string geo_layer_name
Definition: CopyParams.h:81
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4724
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
Definition: GDAL.h:59
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3641
ThreadId thread_id()
Definition: Logger.cpp:820
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:154
#define VLOG(n)
Definition: Logger.h:316
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712
#define IS_GEO_POLY(T)
Definition: sqltypes.h:328

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALRaster ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 5678 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, anonymous_namespace{Importer.cpp}::check_session_interrupted(), checkpoint(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Geospatial::compress_coords(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), Executor::ERR_INTERRUPTED, anonymous_namespace{Utm.h}::f, g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_output_srid(), Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getBandNullValue(), import_export::RasterImporter::getBandsHeight(), import_export::RasterImporter::getBandsWidth(), Executor::getExecutor(), import_export::RasterImporter::getNumBands(), import_export::RasterImporter::getPointNamesAndSQLTypes(), import_export::RasterImporter::getProjectedPixelCoords(), import_export::RasterImporter::getRawPixels(), import_export::RasterImporter::import(), import_buffers_vec, import_id, import_export::DataStreamSink::import_status_, logger::INFO, ColumnDescriptor::isGeoPhyCol, kARRAY, kBIGINT, kDOUBLE, kFLOAT, kINT, kMaxRasterScanlinesPerThread, kNULLT, kPOINT, kSMALLINT, kTINYINT, load(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, NULL_DOUBLE, NULL_FLOAT, NULL_INT, NULL_SMALLINT, import_export::num_import_threads(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, set_import_status(), logger::thread_id(), timer_start(), TIMER_STOP, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, and VLOG.

Referenced by importGDAL().

5679  {
5680  // initial status
5682 
5683  // metadata columns?
5684  auto const metadata_column_infos =
5686 
5687  // create a raster importer and do the detect
5688  RasterImporter raster_importer;
5689  raster_importer.detect(
5690  file_path,
5696  true,
5697  metadata_column_infos);
5698 
5699  // get the table columns and count actual columns
5700  auto const& column_descs = loader->get_column_descs();
5701  uint32_t num_table_cols{0u};
5702  for (auto const* cd : column_descs) {
5703  if (!cd->isGeoPhyCol) {
5704  num_table_cols++;
5705  }
5706  }
5707 
5708  // how many bands do we have?
5709  auto num_bands = raster_importer.getNumBands();
5710 
5711  // get point columns info
5712  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5713 
5714  // validate that the table column count matches
5715  auto num_expected_cols = num_bands;
5716  num_expected_cols += point_names_and_sql_types.size();
5717  num_expected_cols += metadata_column_infos.size();
5718  if (num_expected_cols != num_table_cols) {
5719  throw std::runtime_error(
5720  "Raster Import aborted. Band/Column count mismatch (file requires " +
5721  std::to_string(num_expected_cols) + ", table has " +
5722  std::to_string(num_table_cols) + ")");
5723  }
5724 
5725  // validate the point column names and types
5726  // if we're importing the coords as a POINT, then the first column
5727  // must be a POINT (two physical columns, POINT and TINYINT[])
5728  // if we're not, the first two columns must be the matching type
5729  // optionally followed by an angle column
5730  auto cd_itr = column_descs.begin();
5731  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5732  if (sql_type == kPOINT) {
5733  // POINT column
5734  {
5735  auto const* cd = *cd_itr++;
5736  if (cd->columnName != col_name) {
5737  throw std::runtime_error("Column '" + cd->columnName +
5738  "' overridden name invalid (must be '" + col_name +
5739  "')");
5740  }
5741  auto const cd_type = cd->columnType.get_type();
5742  if (cd_type != kPOINT) {
5743  throw std::runtime_error("Column '" + cd->columnName +
5744  "' overridden type invalid (must be POINT)");
5745  }
5746  if (cd->columnType.get_output_srid() != 4326) {
5747  throw std::runtime_error("Column '" + cd->columnName +
5748  "' overridden SRID invalid (must be 4326)");
5749  }
5750  }
5751  // TINYINT[] coords sub-column
5752  {
5753  // if the above is true, this must be true
5754  auto const* cd = *cd_itr++;
5755  CHECK(cd->columnType.get_type() == kARRAY);
5756  CHECK(cd->columnType.get_subtype() == kTINYINT);
5757  }
5758  } else {
5759  // column of the matching name and type
5760  auto const* cd = *cd_itr++;
5761  if (cd->columnName != col_name) {
5762  throw std::runtime_error("Column '" + cd->columnName +
5763  "' overridden name invalid (must be '" + col_name +
5764  "')");
5765  }
5766  auto const cd_type = cd->columnType.get_type();
5767  if (cd_type != sql_type) {
5768  throw std::runtime_error("Column '" + cd->columnName +
5769  "' overridden type invalid (must be " +
5770  to_string(sql_type) + ")");
5771  }
5772  }
5773  }
5774 
5775  // validate the band column types
5776  // any Immerse overriding to other types will currently be rejected
5777  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5778  if (band_names_and_types.size() != num_bands) {
5779  throw std::runtime_error("Column/Band count mismatch when validating types");
5780  }
5781  for (uint32_t i = 0; i < num_bands; i++) {
5782  auto const* cd = *cd_itr++;
5783  auto const cd_type = cd->columnType.get_type();
5784  auto const sql_type = band_names_and_types[i].second;
5785  if (cd_type != sql_type) {
5786  throw std::runtime_error("Band Column '" + cd->columnName +
5787  "' overridden type invalid (must be " +
5788  to_string(sql_type) + ")");
5789  }
5790  }
5791 
5792  // validate metadata column
5793  for (auto const& mci : metadata_column_infos) {
5794  auto const* cd = *cd_itr++;
5795  if (mci.column_descriptor.columnName != cd->columnName) {
5796  throw std::runtime_error("Metadata Column '" + cd->columnName +
5797  "' overridden name invalid (must be '" +
5798  mci.column_descriptor.columnName + "')");
5799  }
5800  auto const cd_type = cd->columnType.get_type();
5801  auto const md_type = mci.column_descriptor.columnType.get_type();
5802  if (cd_type != md_type) {
5803  throw std::runtime_error("Metadata Column '" + cd->columnName +
5804  "' overridden type invalid (must be " +
5805  to_string(md_type) + ")");
5806  }
5807  }
5808 
5809  // import geo table is specifically handled in both DBHandler and QueryRunner
5810  // that is separate path against a normal SQL execution
5811  // so we here explicitly enroll the import session to allow interruption
5812  // while importing geo table
5813  auto query_session = session_info ? session_info->get_session_id() : "";
5814  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5816  auto is_session_already_registered = false;
5817  {
5819  executor->getSessionLock());
5820  is_session_already_registered =
5821  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5822  }
5823  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5824  !is_session_already_registered) {
5825  executor->enrollQuerySession(query_session,
5826  "IMPORT_GEO_TABLE",
5827  query_submitted_time,
5829  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5830  }
5831  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5832  // reset the runtime query interrupt status
5833  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5834  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5835  }
5836  };
5837 
5838  // how many threads are we gonna use?
5840  VLOG(1) << "GDAL import # threads: " << max_threads;
5841 
5843  throw std::runtime_error("Invalid CopyParams.raster_scanlines_per_thread! (" +
5845  ")");
5846  }
5847  const int max_scanlines_per_thread =
5852  VLOG(1) << "Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5853 
5854  // make an import buffer for each thread
5855  CHECK_EQ(import_buffers_vec.size(), 0u);
5857  for (size_t i = 0; i < max_threads; i++) {
5858  for (auto const& cd : loader->get_column_descs()) {
5859  import_buffers_vec[i].emplace_back(
5860  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5861  }
5862  }
5863 
5864  // status and times
5865  using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5866 
5867  // get the band dimensions
5868  auto const band_size_x = raster_importer.getBandsWidth();
5869  auto const band_size_y = raster_importer.getBandsHeight();
5870 
5871  // allocate raw pixel buffers per thread
5872  std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5873  for (size_t i = 0; i < max_threads; i++) {
5874  raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5875  sizeof(double));
5876  }
5877 
5878  // just the sql type of the first point column (if any)
5879  auto const point_sql_type = point_names_and_sql_types.size()
5880  ? point_names_and_sql_types.begin()->second
5881  : kNULLT;
5882 
5883  // lambda for importing to raw data buffers (threadable)
5884  auto import_rows =
5885  [&](const size_t thread_idx, const int y_start, const int y_end) -> ThreadReturn {
5886  // this threads's import buffers
5887  auto& import_buffers = import_buffers_vec[thread_idx];
5888 
5889  // this thread's raw pixel bytes
5890  auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5891 
5892  // clear the buffers
5893  for (auto& col_buffer : import_buffers) {
5894  col_buffer->clear();
5895  }
5896 
5897  // prepare to iterate columns
5898  auto col_itr = column_descs.begin();
5899  int col_idx{0};
5900 
5901  float proj_s{0.0f};
5902  if (point_sql_type != kNULLT) {
5903  // the first two columns (either lon/lat or POINT/coords)
5904  auto const* cd_col0 = *col_itr++;
5905  auto const* cd_col1 = *col_itr++;
5906  auto const* cd_angle =
5907  copy_params.raster_point_compute_angle ? *col_itr++ : nullptr;
5908 
5909  // compute and add x and y
5910  auto proj_timer = timer_start();
5911  for (int y = y_start; y < y_end; y++) {
5912  // get projected pixel coords for this scan-line
5913  auto const coords = raster_importer.getProjectedPixelCoords(thread_idx, y);
5914 
5915  // add to buffers
5916  for (int x = 0; x < band_size_x; x++) {
5917  // this point and angle
5918  auto const& [dx, dy, angle] = coords[x];
5919 
5920  // store the point
5921  switch (point_sql_type) {
5922  case kPOINT: {
5923  // add empty value to POINT buffer
5924  TDatum td_point;
5925  import_buffers[0]->add_value(cd_col0, td_point, false);
5926 
5927  // convert lon/lat to bytes (compressed or not) and add to POINT coords
5928  // buffer
5929  auto const compressed_coords =
5930  Geospatial::compress_coords({dx, dy}, cd_col0->columnType);
5931  std::vector<TDatum> td_coords_data;
5932  for (auto const& cc : compressed_coords) {
5933  TDatum td_byte;
5934  td_byte.val.int_val = cc;
5935  td_coords_data.push_back(td_byte);
5936  }
5937  TDatum td_coords;
5938  td_coords.val.arr_val = td_coords_data;
5939  td_coords.is_null = false;
5940  import_buffers[1]->add_value(cd_col1, td_coords, false);
5941  } break;
5942  case kFLOAT:
5943  case kDOUBLE: {
5944  TDatum td;
5945  td.is_null = false;
5946  td.val.real_val = dx;
5947  import_buffers[0]->add_value(cd_col0, td, false);
5948  td.val.real_val = dy;
5949  import_buffers[1]->add_value(cd_col1, td, false);
5950  } break;
5951  case kSMALLINT:
5952  case kINT: {
5953  TDatum td;
5954  td.is_null = false;
5955  td.val.int_val = static_cast<int64_t>(x);
5956  import_buffers[0]->add_value(cd_col0, td, false);
5957  td.val.int_val = static_cast<int64_t>(y);
5958  import_buffers[1]->add_value(cd_col1, td, false);
5959  } break;
5960  default:
5961  CHECK(false);
5962  }
5963 
5964  // angle?
5966  CHECK(cd_angle);
5967  TDatum td;
5968  td.is_null = false;
5969  td.val.real_val = static_cast<double>(angle);
5970  import_buffers[2]->add_value(cd_angle, td, false);
5971  }
5972  }
5973  }
5974  proj_s = TIMER_STOP(proj_timer);
5975  col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5976  }
5977 
5978  // prepare to accumulate read and conv times
5979  float read_s{0.0f};
5980  float conv_s{0.0f};
5981 
5982  // y_end is one past the actual end, so don't add 1
5983  auto const num_rows = y_end - y_start;
5984  auto const num_elems = band_size_x * num_rows;
5985 
5986  // for each band/column
5987  for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5988  // the corresponding column
5989  auto const* cd_band = *col_itr;
5990  CHECK(cd_band);
5991 
5992  // data type to read as
5993  auto const cd_type = cd_band->columnType.get_type();
5994 
5995  // read the scanlines (will do a data type conversion if necessary)
5996  auto read_timer = timer_start();
5997  raster_importer.getRawPixels(
5998  thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5999  read_s += TIMER_STOP(read_timer);
6000 
6001  // null value?
6002  auto const [null_value, null_value_valid] =
6003  raster_importer.getBandNullValue(band_idx);
6004 
6005  // copy to this thread's import buffers
6006  // convert any nulls we find
6007  auto conv_timer = timer_start();
6008  TDatum td;
6009  switch (cd_type) {
6010  case kSMALLINT: {
6011  const int16_t* values =
6012  reinterpret_cast<const int16_t*>(raw_pixel_bytes.data());
6013  for (int idx = 0; idx < num_elems; idx++) {
6014  auto const& value = values[idx];
6015  if (null_value_valid && value == static_cast<int16_t>(null_value)) {
6016  td.is_null = true;
6017  td.val.int_val = NULL_SMALLINT;
6018  } else {
6019  td.is_null = false;
6020  td.val.int_val = static_cast<int64_t>(value);
6021  }
6022  import_buffers[col_idx]->add_value(cd_band, td, false);
6023  }
6024  } break;
6025  case kINT: {
6026  const int32_t* values =
6027  reinterpret_cast<const int32_t*>(raw_pixel_bytes.data());
6028  for (int idx = 0; idx < num_elems; idx++) {
6029  auto const& value = values[idx];
6030  if (null_value_valid && value == static_cast<int32_t>(null_value)) {
6031  td.is_null = true;
6032  td.val.int_val = NULL_INT;
6033  } else {
6034  td.is_null = false;
6035  td.val.int_val = static_cast<int64_t>(value);
6036  }
6037  import_buffers[col_idx]->add_value(cd_band, td, false);
6038  }
6039  } break;
6040  case kBIGINT: {
6041  const uint32_t* values =
6042  reinterpret_cast<const uint32_t*>(raw_pixel_bytes.data());
6043  for (int idx = 0; idx < num_elems; idx++) {
6044  auto const& value = values[idx];
6045  if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
6046  td.is_null = true;
6047  td.val.int_val = NULL_INT;
6048  } else {
6049  td.is_null = false;
6050  td.val.int_val = static_cast<int64_t>(value);
6051  }
6052  import_buffers[col_idx]->add_value(cd_band, td, false);
6053  }
6054  } break;
6055  case kFLOAT: {
6056  const float* values = reinterpret_cast<const float*>(raw_pixel_bytes.data());
6057  for (int idx = 0; idx < num_elems; idx++) {
6058  auto const& value = values[idx];
6059  if (null_value_valid && value == static_cast<float>(null_value)) {
6060  td.is_null = true;
6061  td.val.real_val = NULL_FLOAT;
6062  } else {
6063  td.is_null = false;
6064  td.val.real_val = static_cast<double>(value);
6065  }
6066  import_buffers[col_idx]->add_value(cd_band, td, false);
6067  }
6068  } break;
6069  case kDOUBLE: {
6070  const double* values = reinterpret_cast<const double*>(raw_pixel_bytes.data());
6071  for (int idx = 0; idx < num_elems; idx++) {
6072  auto const& value = values[idx];
6073  if (null_value_valid && value == null_value) {
6074  td.is_null = true;
6075  td.val.real_val = NULL_DOUBLE;
6076  } else {
6077  td.is_null = false;
6078  td.val.real_val = value;
6079  }
6080  import_buffers[col_idx]->add_value(cd_band, td, false);
6081  }
6082  } break;
6083  default:
6084  CHECK(false);
6085  }
6086  conv_s += TIMER_STOP(conv_timer);
6087 
6088  // next column
6089  col_idx++;
6090  col_itr++;
6091  }
6092 
6093  // metadata columns?
6094  for (auto const& mci : metadata_column_infos) {
6095  auto const* cd_band = *col_itr++;
6096  CHECK(cd_band);
6097  for (int i = 0; i < num_elems; i++) {
6098  import_buffers[col_idx]->add_value(cd_band, mci.value, false, copy_params);
6099  }
6100  col_idx++;
6101  }
6102 
6103  // build status
6104  ImportStatus thread_import_status;
6105  thread_import_status.rows_estimated = num_elems;
6106  thread_import_status.rows_completed = num_elems;
6107 
6108  // done
6109  return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
6110  };
6111 
6112  // prepare to checkpoint the table
6113  auto table_epochs = loader->getTableEpochs();
6114 
6115  // start wall clock
6116  auto wall_timer = timer_start();
6117 
6118  // start the import
6119  raster_importer.import(max_threads);
6120 
6121  // time the phases
6122  float total_proj_s{0.0f};
6123  float total_read_s{0.0f};
6124  float total_conv_s{0.0f};
6125  float total_load_s{0.0f};
6126 
6127  const int min_scanlines_per_thread = 8;
6128  const int max_scanlines_per_block = max_scanlines_per_thread * max_threads;
6129  for (int block_y = 0; block_y < band_size_y;
6130  block_y += (max_threads * max_scanlines_per_thread)) {
6131  using Future = std::future<ThreadReturn>;
6132  std::vector<Future> futures;
6133  const int scanlines_in_block =
6134  std::min(band_size_y - block_y, max_scanlines_per_block);
6135  const int pixels_in_block = scanlines_in_block * band_size_x;
6136  const int block_max_scanlines_per_thread =
6137  std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6138  static_cast<int>(max_threads),
6139  min_scanlines_per_thread);
6140  VLOG(1) << "Raster Importer: scanlines_in_block: " << scanlines_in_block
6141  << ", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6142 
6143  std::vector<size_t> rows_per_thread;
6144  auto block_wall_timer = timer_start();
6145  // run max_threads scanlines at once
6146  for (size_t thread_id = 0; thread_id < max_threads; thread_id++) {
6147  const int y_start = block_y + thread_id * block_max_scanlines_per_thread;
6148  if (y_start < band_size_y) {
6149  const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6150  if (y_start < y_end) {
6151  rows_per_thread.emplace_back((y_end - y_start) * band_size_x);
6152  futures.emplace_back(
6153  std::async(std::launch::async, import_rows, thread_id, y_start, y_end));
6154  }
6155  }
6156  }
6157 
6158  // wait for the threads to finish and
6159  // accumulate the results and times
6160  float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6161  size_t thread_idx = 0;
6162  size_t active_threads = 0;
6163  for (auto& future : futures) {
6164  auto const [import_status, times] = future.get();
6165  import_status_ += import_status;
6166  proj_s += times[0];
6167  read_s += times[1];
6168  conv_s += times[2];
6169  // We load the data in thread order so we can get deterministic row-major raster
6170  // ordering
6171  auto thread_load_timer = timer_start();
6172  // Todo: We should consider invoking the load on another thread in a ping-pong
6173  // fashion so we can simultaneously read the next batch of data
6174  load(import_buffers_vec[thread_idx], rows_per_thread[thread_idx], session_info);
6175  ++thread_idx;
6176  ++active_threads;
6177 
6178  load_s += TIMER_STOP(thread_load_timer);
6179  }
6180 
6181  // average times over all threads (except for load which is single-threaded)
6182  total_proj_s += (proj_s / float(active_threads));
6183  total_read_s += (read_s / float(active_threads));
6184  total_conv_s += (conv_s / float(active_threads));
6185  total_load_s += load_s;
6186 
6187  // update the status
6189 
6190  // more debug
6191  auto const block_wall_s = TIMER_STOP(block_wall_timer);
6192  auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6193  auto const rows_per_second = pixels_in_block / block_wall_s;
6194  LOG(INFO) << "Raster Importer: Loaded " << scanlines_in_block
6195  << " scanlines starting at " << block_y << " out of " << band_size_y
6196  << " in " << block_wall_s << "s at " << scanlines_per_second
6197  << " scanlines/s and " << rows_per_second << " rows/s";
6198 
6199  // check for interrupt
6200  if (UNLIKELY(check_session_interrupted(query_session, executor.get()))) {
6201  import_status_.load_failed = true;
6202  import_status_.load_msg = "Raster Import interrupted";
6204  }
6205  }
6206 
6207  // checkpoint
6208  auto checkpoint_timer = timer_start();
6209  checkpoint(table_epochs);
6210  auto const checkpoint_s = TIMER_STOP(checkpoint_timer);
6211 
6212  // stop wall clock
6213  auto const total_wall_s = TIMER_STOP(wall_timer);
6214 
6215  // report
6216  auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6217  auto const total_rows_per_second =
6218  float(band_size_x) * float(band_size_y) / total_wall_s;
6219  LOG(INFO) << "Raster Importer: Imported "
6220  << static_cast<uint64_t>(band_size_x) * static_cast<uint64_t>(band_size_y)
6221  << " rows";
6222  LOG(INFO) << "Raster Importer: Total Import Time " << total_wall_s << "s at "
6223  << total_scanlines_per_second << " scanlines/s and " << total_rows_per_second
6224  << " rows/s";
6225 
6226  // phase times (with proportions)
6227  auto proj_pct = float(int(total_proj_s / total_wall_s * 1000.0f) * 0.1f);
6228  auto read_pct = float(int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6229  auto conv_pct = float(int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6230  auto load_pct = float(int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6231  auto cpnt_pct = float(int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6232 
6233  VLOG(1) << "Raster Importer: Import timing breakdown:";
6234  VLOG(1) << " Project " << total_proj_s << "s (" << proj_pct << "%)";
6235  VLOG(1) << " Read " << total_read_s << "s (" << read_pct << "%)";
6236  VLOG(1) << " Convert " << total_conv_s << "s (" << conv_pct << "%)";
6237  VLOG(1) << " Load " << total_load_s << "s (" << load_pct << "%)";
6238  VLOG(1) << " Checkpoint " << checkpoint_s << "s (" << cpnt_pct << "%)";
6239 
6240  // all done
6241  return import_status_;
6242 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t raster_scanlines_per_thread
Definition: CopyParams.h:90
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: Importer.cpp:125
#define NULL_DOUBLE
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
#define NULL_FLOAT
#define LOG(tag)
Definition: Logger.h:216
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3631
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4920
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string raster_import_dimensions
Definition: CopyParams.h:93
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
#define NULL_INT
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
std::string add_metadata_columns
Definition: CopyParams.h:94
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
#define TIMER_STOP(t)
Definition: Importer.cpp:101
future< Result > async(Fn &&fn, Args &&...args)
RasterPointType raster_point_type
Definition: CopyParams.h:88
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:891
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
#define UNLIKELY(x)
Definition: likely.h:25
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4942
std::string import_id
Definition: Importer.h:887
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3641
ThreadId thread_id()
Definition: Logger.cpp:820
std::string raster_import_bands
Definition: CopyParams.h:89
#define CHECK(condition)
Definition: Logger.h:222
#define NULL_SMALLINT
Definition: sqltypes.h:59
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
static constexpr int kMaxRasterScanlinesPerThread
Definition: Importer.cpp:114
std::unique_ptr< Loader > loader
Definition: Importer.h:892
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
const std::string file_path
Definition: Importer.h:712

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3631 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), import_export::import_thread_shapefile(), and importGDALRaster().

3633  {
3634  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3636  import_status_.load_failed = true;
3637  import_status_.load_msg = loader->getErrorMessage();
3638  }
3639 }
std::lock_guard< T > lock_guard
heavyai::unique_lock< heavyai::shared_mutex > write_lock
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::unique_ptr< Loader > loader
Definition: Importer.h:892

+ Here is the caller graph for this function:

Geospatial::GDAL::DataSourceUqPtr import_export::Importer::openGDALDataSource ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4724 of file Importer.cpp.

References Geospatial::GDAL::init(), import_export::kGeoFile, Geospatial::GDAL::openDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), import_export::CopyParams::source_type, and to_string().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptorsGeo(), importGDALGeo(), and readMetadataSampleGDAL().

4726  {
4734  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4735  std::to_string(static_cast<int>(copy_params.source_type)) +
4736  ")");
4737  }
4739 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::string to_string(char const *&&v)
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string s3_session_token
Definition: CopyParams.h:63
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
Definition: GDAL.cpp:181
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4766 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), openGDALDataSource(), and import_export::parse_add_metadata_columns().

Referenced by DBHandler::detect_column_types().

4771  {
4773  openGDALDataSource(file_name, copy_params));
4774  if (datasource == nullptr) {
4775  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4776  file_name);
4777  }
4778 
4779  OGRLayer& layer =
4780  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4781 
4782  auto const* feature_defn = layer.GetLayerDefn();
4783  CHECK(feature_defn);
4784 
4785  // metadata columns?
4786  auto const metadata_column_infos =
4788 
4789  // get limited feature count
4790  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4791  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4792  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4793 
4794  // prepare sample data map
4795  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4796  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4797  CHECK(column_name);
4798  sample_data[column_name] = {};
4799  }
4800  sample_data[geo_column_name] = {};
4801  for (auto const& mci : metadata_column_infos) {
4802  sample_data[mci.column_descriptor.columnName] = {};
4803  }
4804 
4805  // prepare to read
4806  layer.ResetReading();
4807 
4808  // read features (up to limited count)
4809  uint64_t feature_index{0u};
4810  while (feature_index < num_features) {
4811  // get (and take ownership of) feature
4812  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4813  if (!feature) {
4814  break;
4815  }
4816 
4817  // get feature geometry
4818  auto const* geometry = feature->GetGeometryRef();
4819  if (geometry == nullptr) {
4820  break;
4821  }
4822 
4823  // validate geom type (again?)
4824  switch (wkbFlatten(geometry->getGeometryType())) {
4825  case wkbPoint:
4826  case wkbMultiPoint:
4827  case wkbLineString:
4828  case wkbMultiLineString:
4829  case wkbPolygon:
4830  case wkbMultiPolygon:
4831  break;
4832  default:
4833  throw std::runtime_error("Unsupported geometry type: " +
4834  std::string(geometry->getGeometryName()));
4835  }
4836 
4837  // populate sample data for regular field columns
4838  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4839  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4840  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4841  }
4842 
4843  // populate sample data for metadata columns?
4844  for (auto const& mci : metadata_column_infos) {
4845  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4846  }
4847 
4848  // populate sample data for geo column with WKT string
4849  char* wkts = nullptr;
4850  geometry->exportToWkt(&wkts);
4851  CHECK(wkts);
4852  sample_data[geo_column_name].push_back(wkts);
4853  CPLFree(wkts);
4854 
4855  // next feature
4856  feature_index++;
4857  }
4858 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4743
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
std::string geo_layer_name
Definition: CopyParams.h:81
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4724
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const bool  force_null = false 
)
static

Definition at line 1666 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), foreign_storage::TextFileBufferParser::processGeoColumn(), and foreign_storage::TextFileBufferParser::processInvalidGeoColumn().

1676  {
1677  const auto col_ti = cd->columnType;
1678  const auto col_type = col_ti.get_type();
1679  auto columnId = cd->columnId;
1680  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1681  bool is_null_geo = false;
1682  bool is_null_point = false;
1683  if (!col_ti.get_notnull()) {
1684  // Check for NULL geo
1685  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1686  is_null_point = true;
1687  coords.clear();
1688  }
1689  is_null_geo = coords.empty();
1690  if (is_null_point) {
1691  coords.push_back(NULL_ARRAY_DOUBLE);
1692  coords.push_back(NULL_DOUBLE);
1693  // Treating POINT coords as notnull, need to store actual encoding
1694  // [un]compressed+[not]null
1695  is_null_geo = false;
1696  }
1697  }
1698  if (force_null) {
1699  is_null_geo = true;
1700  }
1701  TDatum tdd_coords;
1702  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1703  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1704  // in a fixlen array, compressed and uncompressed.
1705  if (!is_null_geo) {
1706  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1707  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1708  for (auto cc : compressed_coords) {
1709  tdd_coords.val.arr_val.emplace_back();
1710  tdd_coords.val.arr_val.back().val.int_val = cc;
1711  }
1712  }
1713  tdd_coords.is_null = is_null_geo;
1714  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1715 
1716  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1717  // Create [linest]ring_sizes array value and add it to the physical column
1718  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1719  TDatum tdd_ring_sizes;
1720  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1721  if (!is_null_geo) {
1722  for (auto ring_size : ring_sizes) {
1723  tdd_ring_sizes.val.arr_val.emplace_back();
1724  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1725  }
1726  }
1727  tdd_ring_sizes.is_null = is_null_geo;
1728  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1729  }
1730 
1731  if (col_type == kMULTIPOLYGON) {
1732  // Create poly_rings array value and add it to the physical column
1733  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1734  TDatum tdd_poly_rings;
1735  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1736  if (!is_null_geo) {
1737  for (auto num_rings : poly_rings) {
1738  tdd_poly_rings.val.arr_val.emplace_back();
1739  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1740  }
1741  }
1742  tdd_poly_rings.is_null = is_null_geo;
1743  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1744  }
1745 
1746  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1747  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1748  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1749  TDatum tdd_bounds;
1750  tdd_bounds.val.arr_val.reserve(bounds.size());
1751  if (!is_null_geo) {
1752  for (auto b : bounds) {
1753  tdd_bounds.val.arr_val.emplace_back();
1754  tdd_bounds.val.arr_val.back().val.real_val = b;
1755  }
1756  }
1757  tdd_bounds.is_null = is_null_geo;
1758  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1759  }
1760 
1761  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1762  // Create render_group value and add it to the physical column
1763  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1764  TDatum td_render_group;
1765  td_render_group.val.int_val = render_group;
1766  td_render_group.is_null = is_null_geo;
1767  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1768  }
1769 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
std::vector< int > &  render_groups_column 
)
static

Definition at line 1771 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1780  {
1781  const auto col_ti = cd->columnType;
1782  const auto col_type = col_ti.get_type();
1783  auto columnId = cd->columnId;
1784 
1785  auto coords_row_count = coords_column.size();
1786  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1787  for (auto& coords : coords_column) {
1788  bool is_null_geo = false;
1789  bool is_null_point = false;
1790  if (!col_ti.get_notnull()) {
1791  // Check for NULL geo
1792  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1793  is_null_point = true;
1794  coords.clear();
1795  }
1796  is_null_geo = coords.empty();
1797  if (is_null_point) {
1798  coords.push_back(NULL_ARRAY_DOUBLE);
1799  coords.push_back(NULL_DOUBLE);
1800  // Treating POINT coords as notnull, need to store actual encoding
1801  // [un]compressed+[not]null
1802  is_null_geo = false;
1803  }
1804  }
1805  std::vector<TDatum> td_coords_data;
1806  if (!is_null_geo) {
1807  std::vector<uint8_t> compressed_coords =
1808  Geospatial::compress_coords(coords, col_ti);
1809  for (auto const& cc : compressed_coords) {
1810  TDatum td_byte;
1811  td_byte.val.int_val = cc;
1812  td_coords_data.push_back(td_byte);
1813  }
1814  }
1815  TDatum tdd_coords;
1816  tdd_coords.val.arr_val = td_coords_data;
1817  tdd_coords.is_null = is_null_geo;
1818  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1819  }
1820  col_idx++;
1821 
1822  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1823  if (ring_sizes_column.size() != coords_row_count) {
1824  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1825  }
1826  // Create [linest[ring_sizes array value and add it to the physical column
1827  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1828  for (auto const& ring_sizes : ring_sizes_column) {
1829  bool is_null_geo = false;
1830  if (!col_ti.get_notnull()) {
1831  // Check for NULL geo
1832  is_null_geo = ring_sizes.empty();
1833  }
1834  std::vector<TDatum> td_ring_sizes;
1835  for (auto const& ring_size : ring_sizes) {
1836  TDatum td_ring_size;
1837  td_ring_size.val.int_val = ring_size;
1838  td_ring_sizes.push_back(td_ring_size);
1839  }
1840  TDatum tdd_ring_sizes;
1841  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1842  tdd_ring_sizes.is_null = is_null_geo;
1843  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1844  }
1845  col_idx++;
1846  }
1847 
1848  if (col_type == kMULTIPOLYGON) {
1849  if (poly_rings_column.size() != coords_row_count) {
1850  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1851  }
1852  // Create poly_rings array value and add it to the physical column
1853  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1854  for (auto const& poly_rings : poly_rings_column) {
1855  bool is_null_geo = false;
1856  if (!col_ti.get_notnull()) {
1857  // Check for NULL geo
1858  is_null_geo = poly_rings.empty();
1859  }
1860  std::vector<TDatum> td_poly_rings;
1861  for (auto const& num_rings : poly_rings) {
1862  TDatum td_num_rings;
1863  td_num_rings.val.int_val = num_rings;
1864  td_poly_rings.push_back(td_num_rings);
1865  }
1866  TDatum tdd_poly_rings;
1867  tdd_poly_rings.val.arr_val = td_poly_rings;
1868  tdd_poly_rings.is_null = is_null_geo;
1869  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1870  }
1871  col_idx++;
1872  }
1873 
1874  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1875  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1876  if (bounds_column.size() != coords_row_count) {
1877  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1878  }
1879  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1880  for (auto const& bounds : bounds_column) {
1881  bool is_null_geo = false;
1882  if (!col_ti.get_notnull()) {
1883  // Check for NULL geo
1884  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1885  }
1886  std::vector<TDatum> td_bounds_data;
1887  for (auto const& b : bounds) {
1888  TDatum td_double;
1889  td_double.val.real_val = b;
1890  td_bounds_data.push_back(td_double);
1891  }
1892  TDatum tdd_bounds;
1893  tdd_bounds.val.arr_val = td_bounds_data;
1894  tdd_bounds.is_null = is_null_geo;
1895  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1896  }
1897  col_idx++;
1898  }
1899 
1900  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1901  // Create render_group value and add it to the physical column
1902  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1903  for (auto const& render_group : render_groups_column) {
1904  TDatum td_render_group;
1905  td_render_group.val.int_val = render_group;
1906  td_render_group.is_null = false;
1907  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1908  }
1909  col_idx++;
1910  }
1911 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:222
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 235 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), importGDALGeo(), importGDALRaster(), import_export::ForeignDataImporter::importGeneralS3(), and anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers().

235  {
237  is.end = std::chrono::steady_clock::now();
238  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
240 }
std::lock_guard< T > lock_guard
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:166
heavyai::unique_lock< heavyai::shared_mutex > write_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:165
std::string import_id
Definition: Importer.h:887

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 890 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 888 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 894 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 893 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 889 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDALGeo(), and importGDALRaster().


The documentation for this class was generated from the following files: