OmniSciDB  f17484ade4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
 

Private Member Functions

ImportStatus importGDALGeo (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGDALRaster (const Catalog_Namespace::SessionInfo *session_info)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static
Geospatial::GDAL::DataSourceUqPtr 
openGDALDataSource (const std::string &fileName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsGeo (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsRaster (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 784 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 837 of file Importer.h.

837 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 172 of file Importer.cpp.

176  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:172
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 178 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

179  : DataStreamSink(p, f), loader(providedLoader) {
180  import_id = boost::filesystem::path(file_path).filename().string();
181  file_size = 0;
182  max_threads = 0;
183  p_file = nullptr;
184  buffer[0] = nullptr;
185  buffer[1] = nullptr;
186  // we may be overallocating a little more memory here due to dropping phy cols.
187  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
188  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
189  int i = 0;
190  bool has_array = false;
191  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
192  int skip_physical_cols = 0;
193  for (auto& p : loader->get_column_descs()) {
194  // phy geo columns can't be in input file
195  if (skip_physical_cols-- > 0) {
196  continue;
197  }
198  // neither are rowid or $deleted$
199  // note: columns can be added after rowid/$deleted$
200  if (p->isVirtualCol || p->isDeletedCol) {
201  continue;
202  }
203  skip_physical_cols = p->columnType.get_physical_cols();
204  if (p->columnType.get_type() == kARRAY) {
205  is_array.get()[i] = true;
206  has_array = true;
207  } else {
208  is_array.get()[i] = false;
209  }
210  ++i;
211  }
212  if (has_array) {
213  is_array_a = std::unique_ptr<bool[]>(is_array.release());
214  } else {
215  is_array_a = std::unique_ptr<bool[]>(nullptr);
216  }
217 }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:901
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::string import_id
Definition: Importer.h:895
std::unique_ptr< Loader > loader
Definition: Importer.h:900
const std::string file_path
Definition: Importer.h:720
import_export::Importer::~Importer ( )
override

Definition at line 219 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

219  {
220  if (p_file != nullptr) {
221  fclose(p_file);
222  }
223  if (buffer[0] != nullptr) {
224  free(buffer[0]);
225  }
226  if (buffer[1] != nullptr) {
227  free(buffer[1]);
228  }
229 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3524 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), importGDALGeo(), and importGDALRaster().

3525  {
3526  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3529  // rollback to starting epoch - undo all the added records
3530  loader->setTableEpochs(table_epochs);
3531  } else {
3532  loader->checkpoint();
3533  }
3534  }
3535 
3536  if (loader->getTableDesc()->persistenceLevel ==
3537  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3538  // tables
3539  auto ms = measure<>::execution([&]() {
3541  if (!import_status_.load_failed) {
3542  for (auto& p : import_buffers_vec[0]) {
3543  if (!p->stringDictCheckpoint()) {
3544  LOG(ERROR) << "Checkpointing Dictionary for Column "
3545  << p->getColumnDesc()->columnName << " failed.";
3546  import_status_.load_failed = true;
3547  import_status_.load_msg = "Dictionary checkpoint failed";
3548  break;
3549  }
3550  }
3551  }
3552  });
3553  if (DEBUG_TIMING) {
3554  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3555  << std::endl;
3556  }
3557  }
3558 }
std::lock_guard< T > lock_guard
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:285
#define DEBUG_TIMING
Definition: Importer.cpp:154
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
static constexpr char const * FOREIGN_TABLE
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5052 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5052  {
5053  return gdalStatInternal(path, copy_params, false);
5054 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5017

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5057 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5058  {
5059  return gdalStatInternal(path, copy_params, true);
5060 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5017

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5129 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5131  {
5132  // lazy init GDAL
5139 
5140  // prepare to gather files
5141  std::vector<std::string> files;
5142 
5143  // gather the files recursively
5144  gdalGatherFilesInArchiveRecursive(archive_path, files);
5145 
5146  // convert to relative paths inside archive
5147  for (auto& file : files) {
5148  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5149  }
5150 
5151  // done
5152  return files;
5153 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:5062
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5156 of file Importer.cpp.

References CHECK, EMPTY, GEO, Geospatial::GDAL::init(), NON_GEO, openGDALDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5158  {
5159  // lazy init GDAL
5166 
5167  // prepare to gather layer info
5168  std::vector<GeoFileLayerInfo> layer_info;
5169 
5170  // open the data set
5172  if (poDS == nullptr) {
5173  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5174  file_name);
5175  }
5176 
5177  // enumerate the layers
5178  for (auto&& poLayer : poDS->GetLayers()) {
5180  // prepare to read this layer
5181  poLayer->ResetReading();
5182  // skip layer if empty
5183  if (poLayer->GetFeatureCount() > 0) {
5184  // first read layer geo type
5185  auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5186  if (ogr_type == wkbUnknown) {
5187  // layer geo type unknown, so try reading from the first feature
5188  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5189  CHECK(first_feature);
5190  auto const* ogr_geometry = first_feature->GetGeometryRef();
5191  if (ogr_geometry) {
5192  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5193  } else {
5194  ogr_type = wkbNone;
5195  }
5196  }
5197  switch (ogr_type) {
5198  case wkbNone:
5199  // no geo
5200  contents = GeoFileLayerContents::NON_GEO;
5201  break;
5202  case wkbPoint:
5203  case wkbMultiPoint:
5204  case wkbLineString:
5205  case wkbMultiLineString:
5206  case wkbPolygon:
5207  case wkbMultiPolygon:
5208  // layer has supported geo
5209  contents = GeoFileLayerContents::GEO;
5210  break;
5211  default:
5212  // layer has unsupported geometry
5214  break;
5215  }
5216  }
5217  // store info for this layer
5218  layer_info.emplace_back(poLayer->GetName(), contents);
5219  }
5220 
5221  // done
5222  return layer_info;
5223 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4582
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:291
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 5017 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

5019  {
5020  // lazy init GDAL
5027 
5028 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5029  // clear GDAL stat cache
5030  // without this, file existence will be cached, even if authentication changes
5031  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5032  VSICurlClearCache();
5033 #endif
5034 
5035  // stat path
5036  VSIStatBufL sb;
5037  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5038  if (result < 0) {
5039  return false;
5040  }
5041 
5042  // exists?
5043  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5044  return true;
5045  } else if (VSI_ISREG(sb.st_mode)) {
5046  return true;
5047  }
5048  return false;
5049 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const bool  is_raster,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4819 of file Importer.cpp.

References gdalToColumnDescriptorsGeo(), and gdalToColumnDescriptorsRaster().

Referenced by DBHandler::detect_column_types().

4823  {
4824  if (is_raster) {
4825  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4826  }
4827  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4828 }
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4903
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4831

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsGeo ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4903 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kTEXT, import_export::anonymous_namespace{Importer.cpp}::ogr_to_type(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::PROMOTE_LINESTRING_TO_MULTILINESTRING, import_export::PROMOTE_POINT_TO_MULTIPOINT, import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4906  {
4907  std::list<ColumnDescriptor> cds;
4908 
4910  if (poDS == nullptr) {
4911  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4912  file_name);
4913  }
4914  if (poDS->GetLayerCount() == 0) {
4915  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
4916  " has no layers");
4917  }
4918 
4919  OGRLayer& layer =
4921 
4922  layer.ResetReading();
4923  // TODO(andrewseidl): support multiple features
4924  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
4925  if (poFeature == nullptr) {
4926  throw std::runtime_error("No features found in " + file_name);
4927  }
4928  // get fields as regular columns
4929  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4930  CHECK(poFDefn);
4931  int iField;
4932  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4933  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4934  auto typePair = ogr_to_type(poFieldDefn->GetType());
4935  ColumnDescriptor cd;
4936  cd.columnName = poFieldDefn->GetNameRef();
4937  cd.sourceName = poFieldDefn->GetNameRef();
4938  SQLTypeInfo ti;
4939  if (typePair.second) {
4940  ti.set_type(kARRAY);
4941  ti.set_subtype(typePair.first);
4942  } else {
4943  ti.set_type(typePair.first);
4944  }
4945  if (typePair.first == kTEXT) {
4947  ti.set_comp_param(0);
4948  }
4949  ti.set_fixed_size();
4950  cd.columnType = ti;
4951  cds.push_back(cd);
4952  }
4953  // try getting the geo column type from the layer
4954  auto ogr_type = wkbFlatten(layer.GetGeomType());
4955  if (ogr_type == wkbUnknown) {
4956  // layer geo type unknown, so try the feature (that we already got)
4957  auto const* ogr_geometry = poFeature->GetGeometryRef();
4958  if (ogr_geometry) {
4959  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
4960  }
4961  }
4962  // do we have a geo column?
4963  if (ogr_type != wkbNone) {
4964  ColumnDescriptor cd;
4965  cd.columnName = geo_column_name;
4966  cd.sourceName = geo_column_name;
4967 
4968  // if exploding, override any collection type to child type
4970  if (ogr_type == wkbMultiPolygon) {
4971  ogr_type = wkbPolygon;
4972  } else if (ogr_type == wkbMultiLineString) {
4973  ogr_type = wkbLineString;
4974  } else if (ogr_type == wkbMultiPoint) {
4975  ogr_type = wkbPoint;
4976  }
4977  }
4978 
4979  // convert to internal type
4980  // this will throw if the type is unsupported
4981  SQLTypes geoType = ogr_to_type(ogr_type);
4982 
4983  // promote column type? (unless exploding)
4985  if (PROMOTE_POINT_TO_MULTIPOINT && geoType == kPOINT) {
4986  geoType = kMULTIPOINT;
4987  } else if (PROMOTE_LINESTRING_TO_MULTILINESTRING && geoType == kLINESTRING) {
4988  geoType = kMULTILINESTRING;
4989  } else if (PROMOTE_POLYGON_TO_MULTIPOLYGON && geoType == kPOLYGON) {
4990  geoType = kMULTIPOLYGON;
4991  }
4992  }
4993 
4994  // build full internal type
4995  SQLTypeInfo ti;
4996  ti.set_type(geoType);
5002  cd.columnType = ti;
5003 
5004  cds.push_back(cd);
5005  }
5006 
5007  // metadata columns?
5008  auto metadata_column_infos =
5010  for (auto& mci : metadata_column_infos) {
5011  cds.push_back(std::move(mci.column_descriptor));
5012  }
5013 
5014  return cds;
5015 }
void set_compression(EncodingType c)
Definition: sqltypes.h:479
SQLTypes
Definition: sqltypes.h:65
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:469
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4601
std::string sourceName
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:472
void set_fixed_size()
Definition: sqltypes.h:477
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4720
static constexpr bool PROMOTE_LINESTRING_TO_MULTILINESTRING
Definition: Importer.cpp:163
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:164
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:474
void set_comp_param(int p)
Definition: sqltypes.h:480
std::string geo_layer_name
Definition: CopyParams.h:81
Definition: sqltypes.h:79
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4582
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
static constexpr bool PROMOTE_POINT_TO_MULTIPOINT
Definition: Importer.cpp:162
#define CHECK(condition)
Definition: Logger.h:291
SQLTypeInfo columnType
std::string columnName
EncodingType geo_coords_encoding
Definition: CopyParams.h:76
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:468

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsRaster ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4831 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getPointNamesAndSQLTypes(), Geospatial::GDAL::init(), kENCODING_GEOINT, kGEOMETRY, kPOINT, import_export::parse_add_metadata_columns(), import_export::CopyParams::raster_import_bands, import_export::CopyParams::raster_import_dimensions, import_export::CopyParams::raster_point_compute_angle, import_export::CopyParams::raster_point_transform, import_export::CopyParams::raster_point_type, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Geospatial::GDAL::setAuthorizationTokens(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4834  {
4835  // lazy init GDAL
4842 
4843  // prepare for metadata column
4844  auto metadata_column_infos =
4846 
4847  // create a raster importer and do the detect
4848  RasterImporter raster_importer;
4849  raster_importer.detect(
4850  file_name,
4856  false,
4857  metadata_column_infos);
4858 
4859  // prepare to capture column descriptors
4860  std::list<ColumnDescriptor> cds;
4861 
4862  // get the point column info
4863  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4864 
4865  // create the columns for the point in the specified type
4866  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4867  ColumnDescriptor cd;
4868  cd.columnName = cd.sourceName = col_name;
4869  cd.columnType.set_type(sql_type);
4870  // hardwire other POINT attributes for now
4871  if (sql_type == kPOINT) {
4873  cd.columnType.set_input_srid(4326);
4874  cd.columnType.set_output_srid(4326);
4876  cd.columnType.set_comp_param(32);
4877  }
4878  cds.push_back(cd);
4879  }
4880 
4881  // get the names and types for the band column(s)
4882  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4883 
4884  // add column descriptors for each band
4885  for (auto const& [band_name, sql_type] : band_names_and_types) {
4886  ColumnDescriptor cd;
4887  cd.columnName = cd.sourceName = band_name;
4888  cd.columnType.set_type(sql_type);
4890  cds.push_back(cd);
4891  }
4892 
4893  // metadata columns?
4894  for (auto& mci : metadata_column_infos) {
4895  cds.push_back(std::move(mci.column_descriptor));
4896  }
4897 
4898  // return the results
4899  return cds;
4900 }
void set_compression(EncodingType c)
Definition: sqltypes.h:479
std::string s3_secret_key
Definition: CopyParams.h:62
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4778
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:469
static void init()
Definition: GDAL.cpp:67
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string sourceName
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:472
RasterPointType raster_point_type
Definition: CopyParams.h:88
void set_fixed_size()
Definition: sqltypes.h:477
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:474
void set_comp_param(int p)
Definition: sqltypes.h:480
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4800
std::string s3_session_token
Definition: CopyParams.h:63
std::string raster_import_bands
Definition: CopyParams.h:89
SQLTypeInfo columnType
std::string s3_access_key
Definition: CopyParams.h:61
std::string columnName
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:468

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 801 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

801  {
802  return loader->get_column_descs();
803  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 800 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

800 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 810 of file Importer.h.

References import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

810  {
811  return import_buffers_vec[i];
812  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 807 of file Importer.h.

References import_buffers_vec.

807  {
808  return import_buffers_vec;
809  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 231 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

231  {
233  auto it = import_status_map.find(import_id);
234  if (it == import_status_map.end()) {
235  throw std::runtime_error("Import status not found for id: " + import_id);
236  }
237  return it->second;
238 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:167
std::shared_lock< T > shared_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:166
std::string import_id
Definition: Importer.h:895

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 813 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

813 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:901

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 847 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

847  {
848  return loader->getCatalog();
849  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 870 of file Importer.h.

References loader.

870  {
871  return loader.get();
872  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4361 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4361  {
4362  return DataStreamSink::archivePlumber(session_info);
4363 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3560

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4365 of file Importer.cpp.

References threading_serial::async(), CHECK, checkpoint(), logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), heavyai::fopen(), Catalog_Namespace::SessionInfo::get_session_id(), Executor::getExecutor(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4368  {
4370  auto query_session = session_info ? session_info->get_session_id() : "";
4371 
4372  if (!p_file) {
4373  p_file = fopen(file_path.c_str(), "rb");
4374  }
4375  if (!p_file) {
4376  throw std::runtime_error("failed to open file '" + file_path +
4377  "': " + strerror(errno));
4378  }
4379 
4380  if (!decompressed) {
4381  (void)fseek(p_file, 0, SEEK_END);
4382  file_size = ftell(p_file);
4383  }
4384 
4386  VLOG(1) << "Delimited import # threads: " << max_threads;
4387 
4388  // deal with small files
4389  size_t alloc_size = copy_params.buffer_size;
4390  if (!decompressed && file_size < alloc_size) {
4391  alloc_size = file_size;
4392  }
4393 
4394  for (size_t i = 0; i < max_threads; i++) {
4395  import_buffers_vec.emplace_back();
4396  for (const auto cd : loader->get_column_descs()) {
4397  import_buffers_vec[i].emplace_back(
4398  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4399  }
4400  }
4401 
4402  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4403  size_t current_pos = 0;
4404  size_t end_pos;
4405  size_t begin_pos = 0;
4406 
4407  (void)fseek(p_file, current_pos, SEEK_SET);
4408  size_t size =
4409  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4410 
4411  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4412  loader->getTableDesc()->tableId};
4413  auto table_epochs = loader->getTableEpochs();
4415  {
4416  std::list<std::future<ImportStatus>> threads;
4417 
4418  // use a stack to track thread_ids which must not overlap among threads
4419  // because thread_id is used to index import_buffers_vec[]
4420  std::stack<size_t> stack_thread_ids;
4421  for (size_t i = 0; i < max_threads; i++) {
4422  stack_thread_ids.push(i);
4423  }
4424  // added for true row index on error
4425  size_t first_row_index_this_buffer = 0;
4426 
4427  while (size > 0) {
4428  unsigned int num_rows_this_buffer = 0;
4429  CHECK(scratch_buffer);
4430  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4431  scratch_buffer,
4432  size,
4433  copy_params,
4434  first_row_index_this_buffer,
4435  num_rows_this_buffer,
4436  p_file);
4437 
4438  // unput residual
4439  int nresidual = size - end_pos;
4440  std::unique_ptr<char[]> unbuf;
4441  if (nresidual > 0) {
4442  unbuf = std::make_unique<char[]>(nresidual);
4443  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4444  }
4445 
4446  // get a thread_id not in use
4447  auto thread_id = stack_thread_ids.top();
4448  stack_thread_ids.pop();
4449  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4450 
4451  threads.push_back(std::async(std::launch::async,
4453  thread_id,
4454  this,
4455  std::move(scratch_buffer),
4456  begin_pos,
4457  end_pos,
4458  end_pos,
4459  first_row_index_this_buffer,
4460  session_info,
4461  executor));
4462 
4463  first_row_index_this_buffer += num_rows_this_buffer;
4464 
4465  current_pos += end_pos;
4466  scratch_buffer = std::make_unique<char[]>(alloc_size);
4467  CHECK(scratch_buffer);
4468  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4469  size = nresidual +
4470  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4471 
4472  begin_pos = 0;
4473  while (threads.size() > 0) {
4474  int nready = 0;
4475  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4476  it != threads.end();) {
4477  auto& p = *it;
4478  std::chrono::milliseconds span(0);
4479  if (p.wait_for(span) == std::future_status::ready) {
4480  auto ret_import_status = p.get();
4481  {
4483  import_status_ += ret_import_status;
4484  if (ret_import_status.load_failed) {
4486  }
4487  }
4488  // sum up current total file offsets
4489  size_t total_file_offset{0};
4490  if (decompressed) {
4491  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4492  for (const auto file_offset : file_offsets) {
4493  total_file_offset += file_offset;
4494  }
4495  }
4496  // estimate number of rows per current total file offset
4497  if (decompressed ? total_file_offset : current_pos) {
4499  (decompressed ? (float)total_file_size / total_file_offset
4500  : (float)file_size / current_pos) *
4501  import_status_.rows_completed;
4502  }
4503  VLOG(3) << "rows_completed " << import_status_.rows_completed
4504  << ", rows_estimated " << import_status_.rows_estimated
4505  << ", total_file_size " << total_file_size << ", total_file_offset "
4506  << total_file_offset;
4508  // recall thread_id for reuse
4509  stack_thread_ids.push(ret_import_status.thread_id);
4510  threads.erase(it++);
4511  ++nready;
4512  } else {
4513  ++it;
4514  }
4515  }
4516 
4517  if (nready == 0) {
4518  std::this_thread::yield();
4519  }
4520 
4521  // on eof, wait all threads to finish
4522  if (0 == size) {
4523  continue;
4524  }
4525 
4526  // keep reading if any free thread slot
4527  // this is one of the major difference from old threading model !!
4528  if (threads.size() < max_threads) {
4529  break;
4530  }
4533  break;
4534  }
4535  }
4538  import_status_.load_failed = true;
4539  // todo use better message
4540  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4541  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4542  break;
4543  }
4545  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4546  break;
4547  }
4548  }
4549 
4550  // join dangling threads in case of LOG(ERROR) above
4551  for (auto& p : threads) {
4552  p.wait();
4553  }
4554  }
4555 
4556  checkpoint(table_epochs);
4557 
4558  fclose(p_file);
4559  p_file = nullptr;
4560  return import_status_;
4561 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
#define LOG(tag)
Definition: Logger.h:285
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:1986
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:505
future< Result > async(Fn &&fn, Args &&...args)
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:877
#define CHECK(condition)
Definition: Logger.h:291
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
std::vector< size_t > file_offsets
Definition: Importer.h:725
#define VLOG(n)
Definition: Logger.h:388
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info,
const bool  is_raster 
)

Definition at line 5225 of file Importer.cpp.

References importGDALGeo(), and importGDALRaster().

Referenced by QueryRunner::ImportDriver::importGeoTable().

5228  {
5229  if (is_raster) {
5230  return importGDALRaster(session_info);
5231  }
5232  return importGDALGeo(columnNameToSourceNameMap, session_info);
5233 }
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5235
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5523

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALGeo ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 5235 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, checkpoint(), logger::ERROR, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_session_id(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by importGDAL().

5237  {
5238  // initial status
5241  if (poDS == nullptr) {
5242  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5243  file_path);
5244  }
5245 
5246  OGRLayer& layer =
5248 
5249  // get the number of features in this layer
5250  size_t numFeatures = layer.GetFeatureCount();
5251 
5252  // build map of metadata field (additional columns) name to index
5253  // use shared_ptr since we need to pass it to the worker
5254  FieldNameToIndexMapType fieldNameToIndexMap;
5255  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5256  CHECK(poFDefn);
5257  size_t numFields = poFDefn->GetFieldCount();
5258  for (size_t iField = 0; iField < numFields; iField++) {
5259  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5260  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5261  }
5262 
5263  // the geographic spatial reference we want to put everything in
5264  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5265  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5266 
5267 #if GDAL_VERSION_MAJOR >= 3
5268  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5269  // this results in X and Y being transposed for angle-based
5270  // coordinate systems. This restores the previous behavior.
5271  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5272 #endif
5273 
5274 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5275  // just one "thread"
5276  max_threads = 1;
5277 #else
5278  // how many threads to use
5280 #endif
5281  VLOG(1) << "GDAL import # threads: " << max_threads;
5282 
5283  // metadata columns?
5284  auto const metadata_column_infos =
5286 
5287  // import geo table is specifically handled in both DBHandler and QueryRunner
5288  // that is separate path against a normal SQL execution
5289  // so we here explicitly enroll the import session to allow interruption
5290  // while importing geo table
5291  auto query_session = session_info ? session_info->get_session_id() : "";
5292  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5294  auto is_session_already_registered = false;
5295  {
5297  executor->getSessionLock());
5298  is_session_already_registered =
5299  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5300  }
5301  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5302  !is_session_already_registered) {
5303  executor->enrollQuerySession(query_session,
5304  "IMPORT_GEO_TABLE",
5305  query_submitted_time,
5307  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5308  }
5309  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5310  // reset the runtime query interrupt status
5311  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5312  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5313  }
5314  };
5315 
5316  // make an import buffer for each thread
5317  CHECK_EQ(import_buffers_vec.size(), 0u);
5319  for (size_t i = 0; i < max_threads; i++) {
5320  for (const auto cd : loader->get_column_descs()) {
5321  import_buffers_vec[i].emplace_back(
5322  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5323  }
5324  }
5325 
5326 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5327  // threads
5328  std::list<std::future<ImportStatus>> threads;
5329 
5330  // use a stack to track thread_ids which must not overlap among threads
5331  // because thread_id is used to index import_buffers_vec[]
5332  std::stack<size_t> stack_thread_ids;
5333  for (size_t i = 0; i < max_threads; i++) {
5334  stack_thread_ids.push(i);
5335  }
5336 #endif
5337 
5338  // checkpoint the table
5339  auto table_epochs = loader->getTableEpochs();
5340 
5341  // reset the layer
5342  layer.ResetReading();
5343 
5344  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5345 
5346  // make a features buffer for each thread
5347  std::vector<FeaturePtrVector> features(max_threads);
5348 
5349  // make one of these for each thread, based on the first feature's SR
5350  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5351  max_threads);
5352 
5353  // for each feature...
5354  size_t firstFeatureThisChunk = 0;
5355  while (firstFeatureThisChunk < numFeatures) {
5356  // how many features this chunk
5357  size_t numFeaturesThisChunk =
5358  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5359 
5360 // get a thread_id not in use
5361 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5362  size_t thread_id = 0;
5363 #else
5364  auto thread_id = stack_thread_ids.top();
5365  stack_thread_ids.pop();
5366  CHECK(thread_id < max_threads);
5367 #endif
5368 
5369  // fill features buffer for new thread
5370  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5371  features[thread_id].emplace_back(layer.GetNextFeature());
5372  }
5373 
5374  // construct a coordinate transformation for each thread, if needed
5375  // some features may not have geometry, so look for the first one that does
5376  if (coordinate_transformations[thread_id] == nullptr) {
5377  for (auto const& feature : features[thread_id]) {
5378  auto const* geometry = feature->GetGeometryRef();
5379  if (geometry) {
5380  auto const* geometry_sr = geometry->getSpatialReference();
5381  // if the SR is non-null and non-empty and different from what we want
5382  // we need to make a reusable CoordinateTransformation
5383  if (geometry_sr &&
5384 #if GDAL_VERSION_MAJOR >= 3
5385  !geometry_sr->IsEmpty() &&
5386 #endif
5387  !geometry_sr->IsSame(poGeographicSR.get())) {
5388  // validate the SR before trying to use it
5389  if (geometry_sr->Validate() != OGRERR_NONE) {
5390  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5391  }
5392  // create the OGRCoordinateTransformation that will be used for
5393  // all the features in this chunk
5394  coordinate_transformations[thread_id].reset(
5395  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5396  if (coordinate_transformations[thread_id] == nullptr) {
5397  throw std::runtime_error(
5398  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5399  }
5400  }
5401  // once we find at least one geometry with an SR, we're done
5402  break;
5403  }
5404  }
5405  }
5406 
5407 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5408  // call worker function directly
5409  auto ret_import_status =
5411  this,
5412  coordinate_transformations[thread_id].get(),
5413  std::move(features[thread_id]),
5414  firstFeatureThisChunk,
5415  numFeaturesThisChunk,
5416  fieldNameToIndexMap,
5417  columnNameToSourceNameMap,
5418  session_info,
5419  executor.get(),
5420  metadata_column_infos);
5421  import_status += ret_import_status;
5422  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5423  import_status.rows_completed;
5424  set_import_status(import_id, import_status);
5425 #else
5426  // fire up that thread to import this geometry
5427  threads.push_back(std::async(std::launch::async,
5429  thread_id,
5430  this,
5431  coordinate_transformations[thread_id].get(),
5432  std::move(features[thread_id]),
5433  firstFeatureThisChunk,
5434  numFeaturesThisChunk,
5435  fieldNameToIndexMap,
5436  columnNameToSourceNameMap,
5437  session_info,
5438  executor.get(),
5439  metadata_column_infos));
5440 
5441  // let the threads run
5442  while (threads.size() > 0) {
5443  int nready = 0;
5444  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5445  it != threads.end();) {
5446  auto& p = *it;
5447  std::chrono::milliseconds span(
5448  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5449  if (p.wait_for(span) == std::future_status::ready) {
5450  auto ret_import_status = p.get();
5451  {
5453  import_status_ += ret_import_status;
5455  ((float)firstFeatureThisChunk / (float)numFeatures) *
5459  break;
5460  }
5461  }
5462  // recall thread_id for reuse
5463  stack_thread_ids.push(ret_import_status.thread_id);
5464 
5465  threads.erase(it++);
5466  ++nready;
5467  } else {
5468  ++it;
5469  }
5470  }
5471 
5472  if (nready == 0) {
5473  std::this_thread::yield();
5474  }
5475 
5476  // keep reading if any free thread slot
5477  // this is one of the major difference from old threading model !!
5478  if (threads.size() < max_threads) {
5479  break;
5480  }
5481  }
5482 #endif
5483 
5484  // out of rows?
5485 
5488  import_status_.load_failed = true;
5489  // todo use better message
5490  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5491  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5492  break;
5493  }
5495  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5496  "more details";
5497  break;
5498  }
5499 
5500  firstFeatureThisChunk += numFeaturesThisChunk;
5501  }
5502 
5503 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5504  // wait for any remaining threads
5505  if (threads.size()) {
5506  for (auto& p : threads) {
5507  // wait for the thread
5508  p.wait();
5509  // get the result and update the final import status
5510  auto ret_import_status = p.get();
5511  import_status_ += ret_import_status;
5514  }
5515  }
5516 #endif
5517 
5518  checkpoint(table_epochs);
5519 
5520  return import_status_;
5521 }
std::lock_guard< T > lock_guard
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:150
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define LOG(tag)
Definition: Logger.h:285
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4601
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
Definition: Importer.cpp:2338
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:131
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:505
std::string add_metadata_columns
Definition: CopyParams.h:94
future< Result > async(Fn &&fn, Args &&...args)
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
std::string geo_layer_name
Definition: CopyParams.h:81
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4582
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
Definition: GDAL.h:59
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:877
#define CHECK(condition)
Definition: Logger.h:291
#define VLOG(n)
Definition: Logger.h:388
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALRaster ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 5523 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, anonymous_namespace{Importer.cpp}::check_session_interrupted(), checkpoint(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Geospatial::compress_coords(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), Executor::ERR_INTERRUPTED, logger::ERROR, f(), g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_output_srid(), Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getBandNullValue(), import_export::RasterImporter::getBandsHeight(), import_export::RasterImporter::getBandsWidth(), Executor::getExecutor(), import_export::RasterImporter::getNumBands(), import_export::RasterImporter::getPointNamesAndSQLTypes(), import_export::RasterImporter::getProjectedPixelCoords(), import_export::RasterImporter::getRawPixels(), import_export::RasterImporter::import(), import_buffers_vec, import_id, import_export::DataStreamSink::import_status_, logger::INFO, ColumnDescriptor::isGeoPhyCol, kARRAY, kBIGINT, kDOUBLE, kFLOAT, kINT, kMaxRasterScanlinesPerThread, kNULLT, kPOINT, kSMALLINT, kTINYINT, load(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, NULL_DOUBLE, NULL_FLOAT, NULL_INT, NULL_SMALLINT, import_export::num_import_threads(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), timer_start(), TIMER_STOP, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, and VLOG.

Referenced by importGDAL().

5524  {
5525  // initial status
5527 
5528  // metadata columns?
5529  auto const metadata_column_infos =
5531 
5532  // create a raster importer and do the detect
5533  RasterImporter raster_importer;
5534  raster_importer.detect(
5535  file_path,
5541  true,
5542  metadata_column_infos);
5543 
5544  // get the table columns and count actual columns
5545  auto const& column_descs = loader->get_column_descs();
5546  uint32_t num_table_cols{0u};
5547  for (auto const* cd : column_descs) {
5548  if (!cd->isGeoPhyCol) {
5549  num_table_cols++;
5550  }
5551  }
5552 
5553  // how many bands do we have?
5554  auto num_bands = raster_importer.getNumBands();
5555 
5556  // get point columns info
5557  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5558 
5559  // validate that the table column count matches
5560  auto num_expected_cols = num_bands;
5561  num_expected_cols += point_names_and_sql_types.size();
5562  num_expected_cols += metadata_column_infos.size();
5563  if (num_expected_cols != num_table_cols) {
5564  throw std::runtime_error(
5565  "Raster Import aborted. Band/Column count mismatch (file requires " +
5566  std::to_string(num_expected_cols) + ", table has " +
5567  std::to_string(num_table_cols) + ")");
5568  }
5569 
5570  // validate the point column names and types
5571  // if we're importing the coords as a POINT, then the first column
5572  // must be a POINT (two physical columns, POINT and TINYINT[])
5573  // if we're not, the first two columns must be the matching type
5574  // optionally followed by an angle column
5575  auto cd_itr = column_descs.begin();
5576  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5577  if (sql_type == kPOINT) {
5578  // POINT column
5579  {
5580  auto const* cd = *cd_itr++;
5581  if (cd->columnName != col_name) {
5582  throw std::runtime_error("Column '" + cd->columnName +
5583  "' overridden name invalid (must be '" + col_name +
5584  "')");
5585  }
5586  auto const cd_type = cd->columnType.get_type();
5587  if (cd_type != kPOINT) {
5588  throw std::runtime_error("Column '" + cd->columnName +
5589  "' overridden type invalid (must be POINT)");
5590  }
5591  if (cd->columnType.get_output_srid() != 4326) {
5592  throw std::runtime_error("Column '" + cd->columnName +
5593  "' overridden SRID invalid (must be 4326)");
5594  }
5595  }
5596  // TINYINT[] coords sub-column
5597  {
5598  // if the above is true, this must be true
5599  auto const* cd = *cd_itr++;
5600  CHECK(cd->columnType.get_type() == kARRAY);
5601  CHECK(cd->columnType.get_subtype() == kTINYINT);
5602  }
5603  } else {
5604  // column of the matching name and type
5605  auto const* cd = *cd_itr++;
5606  if (cd->columnName != col_name) {
5607  throw std::runtime_error("Column '" + cd->columnName +
5608  "' overridden name invalid (must be '" + col_name +
5609  "')");
5610  }
5611  auto const cd_type = cd->columnType.get_type();
5612  if (cd_type != sql_type) {
5613  throw std::runtime_error("Column '" + cd->columnName +
5614  "' overridden type invalid (must be " +
5615  to_string(sql_type) + ")");
5616  }
5617  }
5618  }
5619 
5620  // validate the band column types
5621  // any Immerse overriding to other types will currently be rejected
5622  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5623  if (band_names_and_types.size() != num_bands) {
5624  throw std::runtime_error("Column/Band count mismatch when validating types");
5625  }
5626  for (uint32_t i = 0; i < num_bands; i++) {
5627  auto const* cd = *cd_itr++;
5628  auto const cd_type = cd->columnType.get_type();
5629  auto const sql_type = band_names_and_types[i].second;
5630  if (cd_type != sql_type) {
5631  throw std::runtime_error("Band Column '" + cd->columnName +
5632  "' overridden type invalid (must be " +
5633  to_string(sql_type) + ")");
5634  }
5635  }
5636 
5637  // validate metadata column
5638  for (auto const& mci : metadata_column_infos) {
5639  auto const* cd = *cd_itr++;
5640  if (mci.column_descriptor.columnName != cd->columnName) {
5641  throw std::runtime_error("Metadata Column '" + cd->columnName +
5642  "' overridden name invalid (must be '" +
5643  mci.column_descriptor.columnName + "')");
5644  }
5645  auto const cd_type = cd->columnType.get_type();
5646  auto const md_type = mci.column_descriptor.columnType.get_type();
5647  if (cd_type != md_type) {
5648  throw std::runtime_error("Metadata Column '" + cd->columnName +
5649  "' overridden type invalid (must be " +
5650  to_string(md_type) + ")");
5651  }
5652  }
5653 
5654  // import geo table is specifically handled in both DBHandler and QueryRunner
5655  // that is separate path against a normal SQL execution
5656  // so we here explicitly enroll the import session to allow interruption
5657  // while importing geo table
5658  auto query_session = session_info ? session_info->get_session_id() : "";
5659  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5661  auto is_session_already_registered = false;
5662  {
5664  executor->getSessionLock());
5665  is_session_already_registered =
5666  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5667  }
5668  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5669  !is_session_already_registered) {
5670  executor->enrollQuerySession(query_session,
5671  "IMPORT_GEO_TABLE",
5672  query_submitted_time,
5674  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5675  }
5676  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5677  // reset the runtime query interrupt status
5678  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5679  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5680  }
5681  };
5682 
5683  // how many threads are we gonna use?
5685  VLOG(1) << "GDAL import # threads: " << max_threads;
5686 
5688  throw std::runtime_error("Invalid CopyParams.raster_scanlines_per_thread! (" +
5690  ")");
5691  }
5692  const int max_scanlines_per_thread =
5697  VLOG(1) << "Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5698 
5699  // prepare to checkpoint the table
5700  auto table_epochs = loader->getTableEpochs();
5701 
5702  // start wall clock
5703  auto wall_timer = timer_start();
5704 
5705  // start the import
5706  raster_importer.import(
5707  max_threads,
5708  copy_params.threads == 0); // NOTE: `max_threads` may change after this call
5709 
5710  // make an import buffer for each thread
5711  CHECK_EQ(import_buffers_vec.size(), 0u);
5713  for (size_t i = 0; i < max_threads; i++) {
5714  for (auto const& cd : loader->get_column_descs()) {
5715  import_buffers_vec[i].emplace_back(
5716  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5717  }
5718  }
5719 
5720  // status and times
5721  using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5722 
5723  // get the band dimensions
5724  auto const band_size_x = raster_importer.getBandsWidth();
5725  auto const band_size_y = raster_importer.getBandsHeight();
5726 
5727  // allocate raw pixel buffers per thread
5728  std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5729  for (size_t i = 0; i < max_threads; i++) {
5730  raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5731  sizeof(double));
5732  }
5733 
5734  // just the sql type of the first point column (if any)
5735  auto const point_sql_type = point_names_and_sql_types.size()
5736  ? point_names_and_sql_types.begin()->second
5737  : kNULLT;
5738 
5739  // lambda for importing to raw data buffers (threadable)
5740  auto import_rows =
5741  [&](const size_t thread_idx, const int y_start, const int y_end) -> ThreadReturn {
5742  // this threads's import buffers
5743  auto& import_buffers = import_buffers_vec[thread_idx];
5744 
5745  // this thread's raw pixel bytes
5746  auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5747 
5748  // clear the buffers
5749  for (auto& col_buffer : import_buffers) {
5750  col_buffer->clear();
5751  }
5752 
5753  // prepare to iterate columns
5754  auto col_itr = column_descs.begin();
5755  int col_idx{0};
5756 
5757  float proj_s{0.0f};
5758  if (point_sql_type != kNULLT) {
5759  // the first two columns (either lon/lat or POINT/coords)
5760  auto const* cd_col0 = *col_itr++;
5761  auto const* cd_col1 = *col_itr++;
5762  auto const* cd_angle =
5763  copy_params.raster_point_compute_angle ? *col_itr++ : nullptr;
5764 
5765  // compute and add x and y
5766  auto proj_timer = timer_start();
5767  for (int y = y_start; y < y_end; y++) {
5768  // get projected pixel coords for this scan-line
5769  auto const coords = raster_importer.getProjectedPixelCoords(thread_idx, y);
5770 
5771  // add to buffers
5772  for (int x = 0; x < band_size_x; x++) {
5773  // this point and angle
5774  auto const& [dx, dy, angle] = coords[x];
5775 
5776  // store the point
5777  switch (point_sql_type) {
5778  case kPOINT: {
5779  // add empty value to POINT buffer
5780  TDatum td_point;
5781  import_buffers[0]->add_value(cd_col0, td_point, false);
5782 
5783  // convert lon/lat to bytes (compressed or not) and add to POINT coords
5784  // buffer
5785  auto const compressed_coords =
5786  Geospatial::compress_coords({dx, dy}, cd_col0->columnType);
5787  std::vector<TDatum> td_coords_data;
5788  for (auto const& cc : compressed_coords) {
5789  TDatum td_byte;
5790  td_byte.val.int_val = cc;
5791  td_coords_data.push_back(td_byte);
5792  }
5793  TDatum td_coords;
5794  td_coords.val.arr_val = td_coords_data;
5795  td_coords.is_null = false;
5796  import_buffers[1]->add_value(cd_col1, td_coords, false);
5797  } break;
5798  case kFLOAT:
5799  case kDOUBLE: {
5800  TDatum td;
5801  td.is_null = false;
5802  td.val.real_val = dx;
5803  import_buffers[0]->add_value(cd_col0, td, false);
5804  td.val.real_val = dy;
5805  import_buffers[1]->add_value(cd_col1, td, false);
5806  } break;
5807  case kSMALLINT:
5808  case kINT: {
5809  TDatum td;
5810  td.is_null = false;
5811  td.val.int_val = static_cast<int64_t>(x);
5812  import_buffers[0]->add_value(cd_col0, td, false);
5813  td.val.int_val = static_cast<int64_t>(y);
5814  import_buffers[1]->add_value(cd_col1, td, false);
5815  } break;
5816  default:
5817  CHECK(false);
5818  }
5819 
5820  // angle?
5822  CHECK(cd_angle);
5823  TDatum td;
5824  td.is_null = false;
5825  td.val.real_val = static_cast<double>(angle);
5826  import_buffers[2]->add_value(cd_angle, td, false);
5827  }
5828  }
5829  }
5830  proj_s = TIMER_STOP(proj_timer);
5831  col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5832  }
5833 
5834  // prepare to accumulate read and conv times
5835  float read_s{0.0f};
5836  float conv_s{0.0f};
5837 
5838  // y_end is one past the actual end, so don't add 1
5839  auto const num_rows = y_end - y_start;
5840  auto const num_elems = band_size_x * num_rows;
5841 
5842  ImportStatus thread_import_status;
5843 
5844  bool read_block_failed = false;
5845 
5846  // for each band/column
5847  for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5848  // the corresponding column
5849  auto const* cd_band = *col_itr;
5850  CHECK(cd_band);
5851 
5852  // data type to read as
5853  auto const cd_type = cd_band->columnType.get_type();
5854 
5855  // read the scanlines (will do a data type conversion if necessary)
5856  try {
5857  auto read_timer = timer_start();
5858  raster_importer.getRawPixels(
5859  thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5860  read_s += TIMER_STOP(read_timer);
5861  } catch (std::runtime_error& e) {
5862  // report error
5863  LOG(ERROR) << e.what();
5864  // abort this block
5865  read_block_failed = true;
5866  break;
5867  }
5868 
5869  // null value?
5870  auto const [null_value, null_value_valid] =
5871  raster_importer.getBandNullValue(band_idx);
5872 
5873  // copy to this thread's import buffers
5874  // convert any nulls we find
5875  auto conv_timer = timer_start();
5876  TDatum td;
5877  switch (cd_type) {
5878  case kSMALLINT: {
5879  const int16_t* values =
5880  reinterpret_cast<const int16_t*>(raw_pixel_bytes.data());
5881  for (int idx = 0; idx < num_elems; idx++) {
5882  auto const& value = values[idx];
5883  if (null_value_valid && value == static_cast<int16_t>(null_value)) {
5884  td.is_null = true;
5885  td.val.int_val = NULL_SMALLINT;
5886  } else {
5887  td.is_null = false;
5888  td.val.int_val = static_cast<int64_t>(value);
5889  }
5890  import_buffers[col_idx]->add_value(cd_band, td, false);
5891  }
5892  } break;
5893  case kINT: {
5894  const int32_t* values =
5895  reinterpret_cast<const int32_t*>(raw_pixel_bytes.data());
5896  for (int idx = 0; idx < num_elems; idx++) {
5897  auto const& value = values[idx];
5898  if (null_value_valid && value == static_cast<int32_t>(null_value)) {
5899  td.is_null = true;
5900  td.val.int_val = NULL_INT;
5901  } else {
5902  td.is_null = false;
5903  td.val.int_val = static_cast<int64_t>(value);
5904  }
5905  import_buffers[col_idx]->add_value(cd_band, td, false);
5906  }
5907  } break;
5908  case kBIGINT: {
5909  const uint32_t* values =
5910  reinterpret_cast<const uint32_t*>(raw_pixel_bytes.data());
5911  for (int idx = 0; idx < num_elems; idx++) {
5912  auto const& value = values[idx];
5913  if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
5914  td.is_null = true;
5915  td.val.int_val = NULL_INT;
5916  } else {
5917  td.is_null = false;
5918  td.val.int_val = static_cast<int64_t>(value);
5919  }
5920  import_buffers[col_idx]->add_value(cd_band, td, false);
5921  }
5922  } break;
5923  case kFLOAT: {
5924  const float* values = reinterpret_cast<const float*>(raw_pixel_bytes.data());
5925  for (int idx = 0; idx < num_elems; idx++) {
5926  auto const& value = values[idx];
5927  if (null_value_valid && value == static_cast<float>(null_value)) {
5928  td.is_null = true;
5929  td.val.real_val = NULL_FLOAT;
5930  } else {
5931  td.is_null = false;
5932  td.val.real_val = static_cast<double>(value);
5933  }
5934  import_buffers[col_idx]->add_value(cd_band, td, false);
5935  }
5936  } break;
5937  case kDOUBLE: {
5938  const double* values = reinterpret_cast<const double*>(raw_pixel_bytes.data());
5939  for (int idx = 0; idx < num_elems; idx++) {
5940  auto const& value = values[idx];
5941  if (null_value_valid && value == null_value) {
5942  td.is_null = true;
5943  td.val.real_val = NULL_DOUBLE;
5944  } else {
5945  td.is_null = false;
5946  td.val.real_val = value;
5947  }
5948  import_buffers[col_idx]->add_value(cd_band, td, false);
5949  }
5950  } break;
5951  default:
5952  CHECK(false);
5953  }
5954  conv_s += TIMER_STOP(conv_timer);
5955 
5956  // next column
5957  col_idx++;
5958  col_itr++;
5959  }
5960 
5961  if (read_block_failed) {
5962  // discard block data
5963  for (auto& col_buffer : import_buffers) {
5964  col_buffer->clear();
5965  }
5966  thread_import_status.rows_rejected += num_elems;
5967  } else {
5968  // metadata columns?
5969  for (auto const& mci : metadata_column_infos) {
5970  auto const* cd_band = *col_itr++;
5971  CHECK(cd_band);
5972  for (int i = 0; i < num_elems; i++) {
5973  import_buffers[col_idx]->add_value(cd_band, mci.value, false, copy_params);
5974  }
5975  col_idx++;
5976  }
5977  thread_import_status.rows_estimated = num_elems;
5978  thread_import_status.rows_completed = num_elems;
5979  }
5980 
5981  // done
5982  return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
5983  };
5984 
5985  // time the phases
5986  float total_proj_s{0.0f};
5987  float total_read_s{0.0f};
5988  float total_conv_s{0.0f};
5989  float total_load_s{0.0f};
5990 
5991  const int min_scanlines_per_thread = 8;
5992  const int max_scanlines_per_block = max_scanlines_per_thread * max_threads;
5993  for (int block_y = 0; block_y < band_size_y;
5994  block_y += (max_threads * max_scanlines_per_thread)) {
5995  using Future = std::future<ThreadReturn>;
5996  std::vector<Future> futures;
5997  const int scanlines_in_block =
5998  std::min(band_size_y - block_y, max_scanlines_per_block);
5999  const int pixels_in_block = scanlines_in_block * band_size_x;
6000  const int block_max_scanlines_per_thread =
6001  std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6002  static_cast<int>(max_threads),
6003  min_scanlines_per_thread);
6004  VLOG(1) << "Raster Importer: scanlines_in_block: " << scanlines_in_block
6005  << ", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6006 
6007  std::vector<size_t> rows_per_thread;
6008  auto block_wall_timer = timer_start();
6009  // run max_threads scanlines at once
6010  for (size_t thread_id = 0; thread_id < max_threads; thread_id++) {
6011  const int y_start = block_y + thread_id * block_max_scanlines_per_thread;
6012  if (y_start < band_size_y) {
6013  const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6014  if (y_start < y_end) {
6015  rows_per_thread.emplace_back((y_end - y_start) * band_size_x);
6016  futures.emplace_back(
6017  std::async(std::launch::async, import_rows, thread_id, y_start, y_end));
6018  }
6019  }
6020  }
6021 
6022  // wait for the threads to finish and
6023  // accumulate the results and times
6024  float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6025  size_t thread_idx = 0;
6026  for (auto& future : futures) {
6027  auto const [import_status, times] = future.get();
6028  import_status_ += import_status;
6029  proj_s += times[0];
6030  read_s += times[1];
6031  conv_s += times[2];
6032  // We load the data in thread order so we can get deterministic row-major raster
6033  // ordering
6034  // Todo: We should consider invoking the load on another thread in a ping-pong
6035  // fashion so we can simultaneously read the next batch of data
6036  auto thread_load_timer = timer_start();
6037  // only try to load this thread's data if valid
6038  if (import_status.rows_rejected == 0) {
6039  load(import_buffers_vec[thread_idx], rows_per_thread[thread_idx], session_info);
6040  }
6041  load_s += TIMER_STOP(thread_load_timer);
6042  ++thread_idx;
6043  }
6044 
6045  // average times over all threads (except for load which is single-threaded)
6046  total_proj_s += (proj_s / float(futures.size()));
6047  total_read_s += (read_s / float(futures.size()));
6048  total_conv_s += (conv_s / float(futures.size()));
6049  total_load_s += load_s;
6050 
6051  // update the status
6053 
6054  // more debug
6055  auto const block_wall_s = TIMER_STOP(block_wall_timer);
6056  auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6057  auto const rows_per_second = pixels_in_block / block_wall_s;
6058  LOG(INFO) << "Raster Importer: Loaded " << scanlines_in_block
6059  << " scanlines starting at " << block_y << " out of " << band_size_y
6060  << " in " << block_wall_s << "s at " << scanlines_per_second
6061  << " scanlines/s and " << rows_per_second << " rows/s";
6062 
6063  // check for interrupt
6064  if (UNLIKELY(check_session_interrupted(query_session, executor.get()))) {
6065  import_status_.load_failed = true;
6066  import_status_.load_msg = "Raster Import interrupted";
6068  }
6069 
6070  // hit max_reject?
6072  break;
6073  }
6074  }
6075 
6076  // checkpoint
6077  auto checkpoint_timer = timer_start();
6078  checkpoint(table_epochs);
6079  auto const checkpoint_s = TIMER_STOP(checkpoint_timer);
6080 
6081  // stop wall clock
6082  auto const total_wall_s = TIMER_STOP(wall_timer);
6083 
6084  // report
6085  auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6086  auto const total_rows_per_second =
6087  float(band_size_x) * float(band_size_y) / total_wall_s;
6088  LOG(INFO) << "Raster Importer: Imported "
6089  << static_cast<uint64_t>(band_size_x) * static_cast<uint64_t>(band_size_y)
6090  << " rows";
6091  LOG(INFO) << "Raster Importer: Total Import Time " << total_wall_s << "s at "
6092  << total_scanlines_per_second << " scanlines/s and " << total_rows_per_second
6093  << " rows/s";
6094 
6095  // if we hit max_reject, throw an exception now to report the error and abort any
6096  // multi-file loop
6098  std::string msg = "Raster Importer: Import aborted after failing to read " +
6100  " rows/pixels (limit " + std::to_string(copy_params.max_reject) +
6101  ")";
6102  import_status_.load_msg = msg;
6104  throw std::runtime_error(msg);
6105  }
6106 
6107  // phase times (with proportions)
6108  auto proj_pct = float(int(total_proj_s / total_wall_s * 1000.0f) * 0.1f);
6109  auto read_pct = float(int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6110  auto conv_pct = float(int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6111  auto load_pct = float(int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6112  auto cpnt_pct = float(int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6113 
6114  VLOG(1) << "Raster Importer: Import timing breakdown:";
6115  VLOG(1) << " Project " << total_proj_s << "s (" << proj_pct << "%)";
6116  VLOG(1) << " Read " << total_read_s << "s (" << read_pct << "%)";
6117  VLOG(1) << " Convert " << total_conv_s << "s (" << conv_pct << "%)";
6118  VLOG(1) << " Load " << total_load_s << "s (" << load_pct << "%)";
6119  VLOG(1) << " Checkpoint " << checkpoint_s << "s (" << cpnt_pct << "%)";
6120 
6121  // all done
6122  return import_status_;
6123 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
int32_t raster_scanlines_per_thread
Definition: CopyParams.h:90
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: Importer.cpp:124
#define NULL_DOUBLE
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1621
#define NULL_FLOAT
#define LOG(tag)
Definition: Logger.h:285
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3514
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4778
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:131
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string to_string(char const *&&v)
#define NULL_INT
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:505
std::string add_metadata_columns
Definition: CopyParams.h:94
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
#define TIMER_STOP(t)
Definition: Importer.cpp:100
future< Result > async(Fn &&fn, Args &&...args)
RasterPointType raster_point_type
Definition: CopyParams.h:88
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
#define UNLIKELY(x)
Definition: likely.h:25
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4800
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:877
std::string raster_import_bands
Definition: CopyParams.h:89
#define CHECK(condition)
Definition: Logger.h:291
#define NULL_SMALLINT
Definition: sqltypes.h:72
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
static constexpr int kMaxRasterScanlinesPerThread
Definition: Importer.cpp:113
std::unique_ptr< Loader > loader
Definition: Importer.h:900
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3514 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), import_export::import_thread_shapefile(), and importGDALRaster().

3516  {
3517  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3519  import_status_.load_failed = true;
3520  import_status_.load_msg = loader->getErrorMessage();
3521  }
3522 }
std::lock_guard< T > lock_guard
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

Geospatial::GDAL::DataSourceUqPtr import_export::Importer::openGDALDataSource ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4582 of file Importer.cpp.

References Geospatial::GDAL::init(), import_export::kGeoFile, Geospatial::GDAL::openDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), import_export::CopyParams::source_type, and to_string().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptorsGeo(), importGDALGeo(), and readMetadataSampleGDAL().

4584  {
4592  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4593  std::to_string(static_cast<int>(copy_params.source_type)) +
4594  ")");
4595  }
4597 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::string to_string(char const *&&v)
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string s3_session_token
Definition: CopyParams.h:63
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
Definition: GDAL.cpp:181
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4624 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), openGDALDataSource(), and import_export::parse_add_metadata_columns().

Referenced by DBHandler::detect_column_types().

4629  {
4631  openGDALDataSource(file_name, copy_params));
4632  if (datasource == nullptr) {
4633  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4634  file_name);
4635  }
4636 
4637  OGRLayer& layer =
4638  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4639 
4640  auto const* feature_defn = layer.GetLayerDefn();
4641  CHECK(feature_defn);
4642 
4643  // metadata columns?
4644  auto const metadata_column_infos =
4646 
4647  // get limited feature count
4648  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4649  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4650  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4651 
4652  // prepare sample data map
4653  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4654  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4655  CHECK(column_name);
4656  sample_data[column_name] = {};
4657  }
4658  sample_data[geo_column_name] = {};
4659  for (auto const& mci : metadata_column_infos) {
4660  sample_data[mci.column_descriptor.columnName] = {};
4661  }
4662 
4663  // prepare to read
4664  layer.ResetReading();
4665 
4666  // read features (up to limited count)
4667  uint64_t feature_index{0u};
4668  while (feature_index < num_features) {
4669  // get (and take ownership of) feature
4670  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4671  if (!feature) {
4672  break;
4673  }
4674 
4675  // get feature geometry
4676  auto const* geometry = feature->GetGeometryRef();
4677  if (geometry == nullptr) {
4678  break;
4679  }
4680 
4681  // validate geom type (again?)
4682  switch (wkbFlatten(geometry->getGeometryType())) {
4683  case wkbPoint:
4684  case wkbMultiPoint:
4685  case wkbLineString:
4686  case wkbMultiLineString:
4687  case wkbPolygon:
4688  case wkbMultiPolygon:
4689  break;
4690  default:
4691  throw std::runtime_error("Unsupported geometry type: " +
4692  std::string(geometry->getGeometryName()));
4693  }
4694 
4695  // populate sample data for regular field columns
4696  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4697  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4698  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4699  }
4700 
4701  // populate sample data for metadata columns?
4702  for (auto const& mci : metadata_column_infos) {
4703  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4704  }
4705 
4706  // populate sample data for geo column with WKT string
4707  char* wkts = nullptr;
4708  geometry->exportToWkt(&wkts);
4709  CHECK(wkts);
4710  sample_data[geo_column_name].push_back(wkts);
4711  CPLFree(wkts);
4712 
4713  // next feature
4714  feature_index++;
4715  }
4716 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4601
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
std::string geo_layer_name
Definition: CopyParams.h:81
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4582
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
const bool  force_null = false 
)
static

Definition at line 1636 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), foreign_storage::TextFileBufferParser::processGeoColumn(), and foreign_storage::TextFileBufferParser::processInvalidGeoColumn().

1645  {
1646  const auto col_ti = cd->columnType;
1647  const auto col_type = col_ti.get_type();
1648  auto columnId = cd->columnId;
1649  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1650  bool is_null_geo = false;
1651  bool is_null_point = false;
1652  if (!col_ti.get_notnull()) {
1653  // Check for NULL geo
1654  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1655  is_null_point = true;
1656  coords.clear();
1657  }
1658  is_null_geo = coords.empty();
1659  if (is_null_point) {
1660  coords.push_back(NULL_ARRAY_DOUBLE);
1661  coords.push_back(NULL_DOUBLE);
1662  // Treating POINT coords as notnull, need to store actual encoding
1663  // [un]compressed+[not]null
1664  is_null_geo = false;
1665  }
1666  }
1667  if (force_null) {
1668  is_null_geo = true;
1669  }
1670  TDatum tdd_coords;
1671  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1672  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1673  // in a fixlen array, compressed and uncompressed.
1674  if (!is_null_geo) {
1675  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1676  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677  for (auto cc : compressed_coords) {
1678  tdd_coords.val.arr_val.emplace_back();
1679  tdd_coords.val.arr_val.back().val.int_val = cc;
1680  }
1681  }
1682  tdd_coords.is_null = is_null_geo;
1683  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1684 
1685  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1686  // Create [linest]ring_sizes array value and add it to the physical column
1687  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1688  TDatum tdd_ring_sizes;
1689  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1690  if (!is_null_geo) {
1691  for (auto ring_size : ring_sizes) {
1692  tdd_ring_sizes.val.arr_val.emplace_back();
1693  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1694  }
1695  }
1696  tdd_ring_sizes.is_null = is_null_geo;
1697  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1698  }
1699 
1700  if (col_type == kMULTIPOLYGON) {
1701  // Create poly_rings array value and add it to the physical column
1702  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  TDatum tdd_poly_rings;
1704  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1705  if (!is_null_geo) {
1706  for (auto num_rings : poly_rings) {
1707  tdd_poly_rings.val.arr_val.emplace_back();
1708  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1709  }
1710  }
1711  tdd_poly_rings.is_null = is_null_geo;
1712  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1713  }
1714 
1715  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1716  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1717  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1718  TDatum tdd_bounds;
1719  tdd_bounds.val.arr_val.reserve(bounds.size());
1720  if (!is_null_geo) {
1721  for (auto b : bounds) {
1722  tdd_bounds.val.arr_val.emplace_back();
1723  tdd_bounds.val.arr_val.back().val.real_val = b;
1724  }
1725  }
1726  tdd_bounds.is_null = is_null_geo;
1727  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1728  }
1729 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column 
)
static

Definition at line 1731 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1739  {
1740  const auto col_ti = cd->columnType;
1741  const auto col_type = col_ti.get_type();
1742  auto columnId = cd->columnId;
1743 
1744  auto coords_row_count = coords_column.size();
1745  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto& coords : coords_column) {
1747  bool is_null_geo = false;
1748  bool is_null_point = false;
1749  if (!col_ti.get_notnull()) {
1750  // Check for NULL geo
1751  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1752  is_null_point = true;
1753  coords.clear();
1754  }
1755  is_null_geo = coords.empty();
1756  if (is_null_point) {
1757  coords.push_back(NULL_ARRAY_DOUBLE);
1758  coords.push_back(NULL_DOUBLE);
1759  // Treating POINT coords as notnull, need to store actual encoding
1760  // [un]compressed+[not]null
1761  is_null_geo = false;
1762  }
1763  }
1764  std::vector<TDatum> td_coords_data;
1765  if (!is_null_geo) {
1766  std::vector<uint8_t> compressed_coords =
1767  Geospatial::compress_coords(coords, col_ti);
1768  for (auto const& cc : compressed_coords) {
1769  TDatum td_byte;
1770  td_byte.val.int_val = cc;
1771  td_coords_data.push_back(td_byte);
1772  }
1773  }
1774  TDatum tdd_coords;
1775  tdd_coords.val.arr_val = td_coords_data;
1776  tdd_coords.is_null = is_null_geo;
1777  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1778  }
1779  col_idx++;
1780 
1781  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1782  if (ring_sizes_column.size() != coords_row_count) {
1783  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1784  }
1785  // Create [linest[ring_sizes array value and add it to the physical column
1786  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1787  for (auto const& ring_sizes : ring_sizes_column) {
1788  bool is_null_geo = false;
1789  if (!col_ti.get_notnull()) {
1790  // Check for NULL geo
1791  is_null_geo = ring_sizes.empty();
1792  }
1793  std::vector<TDatum> td_ring_sizes;
1794  for (auto const& ring_size : ring_sizes) {
1795  TDatum td_ring_size;
1796  td_ring_size.val.int_val = ring_size;
1797  td_ring_sizes.push_back(td_ring_size);
1798  }
1799  TDatum tdd_ring_sizes;
1800  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801  tdd_ring_sizes.is_null = is_null_geo;
1802  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1803  }
1804  col_idx++;
1805  }
1806 
1807  if (col_type == kMULTIPOLYGON) {
1808  if (poly_rings_column.size() != coords_row_count) {
1809  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1810  }
1811  // Create poly_rings array value and add it to the physical column
1812  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1813  for (auto const& poly_rings : poly_rings_column) {
1814  bool is_null_geo = false;
1815  if (!col_ti.get_notnull()) {
1816  // Check for NULL geo
1817  is_null_geo = poly_rings.empty();
1818  }
1819  std::vector<TDatum> td_poly_rings;
1820  for (auto const& num_rings : poly_rings) {
1821  TDatum td_num_rings;
1822  td_num_rings.val.int_val = num_rings;
1823  td_poly_rings.push_back(td_num_rings);
1824  }
1825  TDatum tdd_poly_rings;
1826  tdd_poly_rings.val.arr_val = td_poly_rings;
1827  tdd_poly_rings.is_null = is_null_geo;
1828  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1829  }
1830  col_idx++;
1831  }
1832 
1833  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1834  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1835  if (bounds_column.size() != coords_row_count) {
1836  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1837  }
1838  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1839  for (auto const& bounds : bounds_column) {
1840  bool is_null_geo = false;
1841  if (!col_ti.get_notnull()) {
1842  // Check for NULL geo
1843  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1844  }
1845  std::vector<TDatum> td_bounds_data;
1846  for (auto const& b : bounds) {
1847  TDatum td_double;
1848  td_double.val.real_val = b;
1849  td_bounds_data.push_back(td_double);
1850  }
1851  TDatum tdd_bounds;
1852  tdd_bounds.val.arr_val = td_bounds_data;
1853  tdd_bounds.is_null = is_null_geo;
1854  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1855  }
1856  col_idx++;
1857  }
1858 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:291
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 240 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), importGDALGeo(), importGDALRaster(), import_export::ForeignDataImporter::importGeneralS3(), and anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers().

240  {
242  is.end = std::chrono::steady_clock::now();
243  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
245 }
std::lock_guard< T > lock_guard
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:167
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:166
std::string import_id
Definition: Importer.h:895

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 898 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 896 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 902 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 901 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 897 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDALGeo(), and importGDALRaster().


The documentation for this class was generated from the following files: