OmniSciDB  c07336695a
Importer_NS::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Importer:
+ Collaboration diagram for Importer_NS::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import ()
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed) override
 
ImportStatus importGDAL (std::map< std::string, std::string > colname_to_src)
 
const CopyParamsget_copy_params () const
 
const std::list< const ColumnDescriptor * > & get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
 
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec ()
 
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const int32_t start_epoch)
 
auto getLoader () const
 
- Public Member Functions inherited from Importer_NS::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths)
 

Static Public Member Functions

static bool hasGDALLibKML ()
 
static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list< ColumnDescriptorgdalToColumnDescriptors (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector< GeoFileLayerInfogdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static bool gdalSupportsNetworkFileAccess ()
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
 

Static Private Member Functions

static void initGDAL ()
 
static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static OGRDataSource * openGDALDataset (const std::string &fileName, const CopyParams &copy_params)
 
static void setGDALAuthorizationTokens (const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from Importer_NS::DataStreamSink
ImportStatus archivePlumber ()
 
- Protected Attributes inherited from Importer_NS::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status
 
bool load_failed = false
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 874 of file Importer.h.

Member Enumeration Documentation

◆ GeoFileLayerContents

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 921 of file Importer.h.

921 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

◆ Importer() [1/2]

Importer_NS::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 146 of file Importer.cpp.

150  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:146

◆ Importer() [2/2]

Importer_NS::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 152 of file Importer.cpp.

References buffer, Importer_NS::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and Importer_NS::DataStreamSink::p_file.

153  : DataStreamSink(p, f), loader(providedLoader) {
154  import_id = boost::filesystem::path(file_path).filename().string();
155  file_size = 0;
156  max_threads = 0;
157  p_file = nullptr;
158  buffer[0] = nullptr;
159  buffer[1] = nullptr;
160  // we may be overallocating a little more memory here due to dropping phy cols.
161  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
162  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
163  int i = 0;
164  bool has_array = false;
165  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
166  int skip_physical_cols = 0;
167  for (auto& p : loader->get_column_descs()) {
168  // phy geo columns can't be in input file
169  if (skip_physical_cols-- > 0) {
170  continue;
171  }
172  // neither are rowid or $deleted$
173  // note: columns can be added after rowid/$deleted$
174  if (p->isVirtualCol || p->isDeletedCol) {
175  continue;
176  }
177  skip_physical_cols = p->columnType.get_physical_cols();
178  if (p->columnType.get_type() == kARRAY) {
179  is_array.get()[i] = true;
180  has_array = true;
181  } else {
182  is_array.get()[i] = false;
183  }
184  ++i;
185  }
186  if (has_array) {
187  is_array_a = std::unique_ptr<bool[]>(is_array.release());
188  } else {
189  is_array_a = std::unique_ptr<bool[]>(nullptr);
190  }
191 }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:972
std::string import_id
Definition: Importer.h:966
const std::string file_path
Definition: Importer.h:753

◆ ~Importer()

Importer_NS::Importer::~Importer ( )
override

Definition at line 193 of file Importer.cpp.

References buffer, and Importer_NS::DataStreamSink::p_file.

193  {
194  if (p_file != nullptr) {
195  fclose(p_file);
196  }
197  if (buffer[0] != nullptr) {
198  free(buffer[0]);
199  }
200  if (buffer[1] != nullptr) {
201  free(buffer[1]);
202  }
203 }

Member Function Documentation

◆ checkpoint()

void Importer_NS::Importer::checkpoint ( const int32_t  start_epoch)

Definition at line 3029 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), import_buffers_vec, logger::INFO, Importer_NS::DataStreamSink::load_failed, loader, and LOG.

Referenced by importDelimited(), and importGDAL().

3029  {
3030  if (load_failed) {
3031  // rollback to starting epoch - undo all the added records
3032  loader->setTableEpoch(start_epoch);
3033  } else {
3034  loader->checkpoint();
3035  }
3036 
3037  if (loader->getTableDesc()->persistenceLevel ==
3038  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3039  auto ms = measure<>::execution([&]() {
3040  if (!load_failed) {
3041  for (auto& p : import_buffers_vec[0]) {
3042  if (!p->stringDictCheckpoint()) {
3043  LOG(ERROR) << "Checkpointing Dictionary for Column "
3044  << p->getColumnDesc()->columnName << " failed.";
3045  load_failed = true;
3046  break;
3047  }
3048  }
3049  }
3050  });
3051  if (DEBUG_TIMING) {
3052  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3053  << std::endl;
3054  }
3055  }
3056 }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
#define LOG(tag)
Definition: Logger.h:182
#define DEBUG_TIMING
Definition: Importer.cpp:135
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalFileExists()

bool Importer_NS::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4367 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::check_geospatial_files(), MapDHandler::detect_column_types(), MapDHandler::get_all_files_in_archive(), MapDHandler::get_first_geo_file_in_archive(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4367  {
4368  return gdalStatInternal(path, copy_params, false);
4369 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4334
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalFileOrDirectoryExists()

bool Importer_NS::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4372 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::detect_column_types(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4373  {
4374  return gdalStatInternal(path, copy_params, true);
4375 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4334
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalGetAllFilesInArchive()

std::vector< std::string > Importer_NS::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 4444 of file Importer.cpp.

References Importer_NS::gdalGatherFilesInArchiveRecursive(), initGDAL(), and setGDALAuthorizationTokens().

Referenced by find_first_geo_file_in_archive(), and MapDHandler::get_all_files_in_archive().

4446  {
4447  // lazy init GDAL
4448  initGDAL();
4449 
4450  // set authorization tokens
4451  setGDALAuthorizationTokens(copy_params);
4452 
4453  // prepare to gather files
4454  std::vector<std::string> files;
4455 
4456  // gather the files recursively
4457  gdalGatherFilesInArchiveRecursive(archive_path, files);
4458 
4459  // convert to relative paths inside archive
4460  for (auto& file : files) {
4461  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4462  }
4463 
4464  // done
4465  return files;
4466 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4018
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4377
static void initGDAL()
Definition: Importer.cpp:3971
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalGetLayersInGeoFile()

std::vector< Importer::GeoFileLayerInfo > Importer_NS::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 4469 of file Importer.cpp.

References CHECK, EMPTY, GEO, initGDAL(), NON_GEO, openGDALDataset(), setGDALAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4471  {
4472  // lazy init GDAL
4473  initGDAL();
4474 
4475  // set authorization tokens
4476  setGDALAuthorizationTokens(copy_params);
4477 
4478  // prepare to gather layer info
4479  std::vector<GeoFileLayerInfo> layer_info;
4480 
4481  // open the data set
4482  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4483  if (poDS == nullptr) {
4484  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4485  file_name);
4486  }
4487 
4488  // enumerate the layers
4489  for (auto&& poLayer : poDS->GetLayers()) {
4491  // prepare to read this layer
4492  poLayer->ResetReading();
4493  // skip layer if empty
4494  if (poLayer->GetFeatureCount() > 0) {
4495  // get first feature
4496  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4497  CHECK(first_feature);
4498  // check feature for geometry
4499  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4500  if (!geometry) {
4501  // layer has no geometry
4502  contents = GeoFileLayerContents::NON_GEO;
4503  } else {
4504  // check the geometry type
4505  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4506  switch (wkbFlatten(geometry_type)) {
4507  case wkbPoint:
4508  case wkbLineString:
4509  case wkbPolygon:
4510  case wkbMultiPolygon:
4511  // layer has supported geo
4512  contents = GeoFileLayerContents::GEO;
4513  break;
4514  default:
4515  // layer has unsupported geometry
4517  break;
4518  }
4519  }
4520  }
4521  // store info for this layer
4522  layer_info.emplace_back(poLayer->GetName(), contents);
4523  }
4524 
4525  // done
4526  return layer_info;
4527 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4018
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:90
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:99
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4088
#define CHECK(condition)
Definition: Logger.h:187
static void initGDAL()
Definition: Importer.cpp:3971
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalStatInternal()

bool Importer_NS::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 4334 of file Importer.cpp.

References initGDAL(), run-benchmark-import::result, and setGDALAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

4336  {
4337  // lazy init GDAL
4338  initGDAL();
4339 
4340  // set authorization tokens
4341  setGDALAuthorizationTokens(copy_params);
4342 
4343 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4344  // clear GDAL stat cache
4345  // without this, file existence will be cached, even if authentication changes
4346  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4347  VSICurlClearCache();
4348 #endif
4349 
4350  // stat path
4351  VSIStatBufL sb;
4352  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4353  if (result < 0) {
4354  return false;
4355  }
4356 
4357  // exists?
4358  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4359  return true;
4360  } else if (VSI_ISREG(sb.st_mode)) {
4361  return true;
4362  }
4363  return false;
4364 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4018
static void initGDAL()
Definition: Importer.cpp:3971
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalSupportsNetworkFileAccess()

bool Importer_NS::Importer::gdalSupportsNetworkFileAccess ( )
static

Definition at line 4530 of file Importer.cpp.

Referenced by add_vsi_network_prefix().

4530  {
4531 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4532  return true;
4533 #else
4534  return false;
4535 #endif
4536 }
+ Here is the caller graph for this function:

◆ gdalToColumnDescriptors()

const std::list< ColumnDescriptor > Importer_NS::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4265 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, Importer_NS::CopyParams::geo_coords_comp_param, Importer_NS::CopyParams::geo_coords_encoding, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_coords_type, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, Importer_NS::ogr_to_type(), openGDALDataset(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_fixed_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_input_srid(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_output_srid(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_type(), and ColumnDescriptor::sourceName.

Referenced by MapDHandler::detect_column_types(), and Importer_NS::ImportDriver::importGeoTable().

4268  {
4269  std::list<ColumnDescriptor> cds;
4270 
4271  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4272  if (poDS == nullptr) {
4273  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4274  file_name);
4275  }
4276 
4277  OGRLayer& layer =
4278  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4279 
4280  layer.ResetReading();
4281  // TODO(andrewseidl): support multiple features
4282  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4283  if (poFeature == nullptr) {
4284  throw std::runtime_error("No features found in " + file_name);
4285  }
4286  // get fields as regular columns
4287  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4288  CHECK(poFDefn);
4289  int iField;
4290  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4291  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4292  auto typePair = ogr_to_type(poFieldDefn->GetType());
4293  ColumnDescriptor cd;
4294  cd.columnName = poFieldDefn->GetNameRef();
4295  cd.sourceName = poFieldDefn->GetNameRef();
4296  SQLTypeInfo ti;
4297  if (typePair.second) {
4298  ti.set_type(kARRAY);
4299  ti.set_subtype(typePair.first);
4300  } else {
4301  ti.set_type(typePair.first);
4302  }
4303  if (typePair.first == kTEXT) {
4305  ti.set_comp_param(32);
4306  }
4307  ti.set_fixed_size();
4308  cd.columnType = ti;
4309  cds.push_back(cd);
4310  }
4311  // get geo column, if any
4312  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4313  if (poGeometry) {
4314  ColumnDescriptor cd;
4315  cd.columnName = geo_column_name;
4316  cd.sourceName = geo_column_name;
4317  SQLTypes geoType = ogr_to_type(wkbFlatten(poGeometry->getGeometryType()));
4319  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4320  }
4321  SQLTypeInfo ti;
4322  ti.set_type(geoType);
4323  ti.set_subtype(copy_params.geo_coords_type);
4324  ti.set_input_srid(copy_params.geo_coords_srid);
4325  ti.set_output_srid(copy_params.geo_coords_srid);
4326  ti.set_compression(copy_params.geo_coords_encoding);
4327  ti.set_comp_param(copy_params.geo_coords_comp_param);
4328  cd.columnType = ti;
4329  cds.push_back(cd);
4330  }
4331  return cds;
4332 }
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4215
SQLTypes
Definition: sqltypes.h:40
void set_input_srid(int d)
Definition: sqltypes.h:413
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:409
void set_fixed_size()
Definition: sqltypes.h:418
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:90
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:99
void set_compression(EncodingType c)
Definition: sqltypes.h:419
std::string sourceName
void set_output_srid(int s)
Definition: sqltypes.h:415
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:410
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4121
specifies the content in-memory of a row in the column metadata table
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4088
Definition: sqltypes.h:54
void set_comp_param(int p)
Definition: sqltypes.h:420
#define CHECK(condition)
Definition: Logger.h:187
SQLTypeInfo columnType
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:141
std::string columnName
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_column_descs()

const std::list<const ColumnDescriptor*>& Importer_NS::Importer::get_column_descs ( ) const
inline

Definition at line 888 of file Importer.h.

Referenced by Importer_NS::DataStreamSink::archivePlumber(), Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

888  {
889  return loader->get_column_descs();
890  }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
+ Here is the caller graph for this function:

◆ get_copy_params()

const CopyParams& Importer_NS::Importer::get_copy_params ( ) const
inline

Definition at line 887 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

887 { return copy_params; }
+ Here is the caller graph for this function:

◆ get_import_buffers()

std::vector<std::unique_ptr<TypedImportBuffer> >& Importer_NS::Importer::get_import_buffers ( int  i)
inline

Definition at line 896 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

896  {
897  return import_buffers_vec[i];
898  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970
+ Here is the caller graph for this function:

◆ get_import_buffers_vec()

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& Importer_NS::Importer::get_import_buffers_vec ( )
inline

Definition at line 893 of file Importer.h.

893  {
894  return import_buffers_vec;
895  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970

◆ get_import_status()

ImportStatus Importer_NS::Importer::get_import_status ( const std::string &  id)
static

Definition at line 205 of file Importer.cpp.

Referenced by MapDHandler::import_table_status().

205  {
206  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
207  return import_status_map.at(import_id);
208 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:144
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:143
std::string import_id
Definition: Importer.h:966
+ Here is the caller graph for this function:

◆ get_is_array()

const bool* Importer_NS::Importer::get_is_array ( ) const
inline

Definition at line 899 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited().

899 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:972
+ Here is the caller graph for this function:

◆ getCatalog()

Catalog_Namespace::Catalog& Importer_NS::Importer::getCatalog ( )
inline

Definition at line 932 of file Importer.h.

Referenced by Importer_NS::Loader::checkpoint(), Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Importer_NS::Loader::getTableEpoch(), Importer_NS::import_thread_delimited(), and Importer_NS::Loader::setTableEpoch().

932 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
+ Here is the caller graph for this function:

◆ getLoader()

auto Importer_NS::Importer::getLoader ( ) const
inline

Definition at line 956 of file Importer.h.

Referenced by Importer_NS::DataStreamSink::archivePlumber().

956 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
+ Here is the caller graph for this function:

◆ hasGDALLibKML()

bool Importer_NS::Importer::hasGDALLibKML ( )
static

Definition at line 4013 of file Importer.cpp.

Referenced by anonymous_namespace{ImportTest.cpp}::TEST_F().

4013  {
4014  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4015 }
+ Here is the caller graph for this function:

◆ import()

ImportStatus Importer_NS::Importer::import ( )

Definition at line 3699 of file Importer.cpp.

References Importer_NS::DataStreamSink::archivePlumber().

3699  {
3701 }
ImportStatus archivePlumber()
Definition: Importer.cpp:3058
+ Here is the call graph for this function:

◆ importDelimited()

ImportStatus Importer_NS::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed 
)
overridevirtual

Implements Importer_NS::DataStreamSink.

Definition at line 3703 of file Importer.cpp.

References Importer_NS::CopyParams::buffer_size, CHECK, checkpoint(), Importer_NS::DataStreamSink::copy_params, anonymous_namespace{ImportTest.cpp}::d(), logger::ERROR, Importer_NS::DataStreamSink::file_offsets, Importer_NS::DataStreamSink::file_offsets_mutex, file_size, Importer_NS::find_end(), import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_delimited(), kMULTIPOLYGON, kPOLYGON, Importer_NS::CopyParams::line_delim, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, Importer_NS::CopyParams::max_reject, max_threads, Importer_NS::DataStreamSink::p_file, Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, Importer_NS::ImportStatus::rows_rejected, set_import_status(), Importer_NS::CopyParams::threads, Importer_NS::DataStreamSink::total_file_size, and VLOG.

Referenced by Importer_NS::DataStreamSink::import_compressed().

3704  {
3705  bool load_truncated = false;
3707 
3708  if (!p_file) {
3709  p_file = fopen(file_path.c_str(), "rb");
3710  }
3711  if (!p_file) {
3712  throw std::runtime_error("failed to open file '" + file_path +
3713  "': " + strerror(errno));
3714  }
3715 
3716  if (!decompressed) {
3717  (void)fseek(p_file, 0, SEEK_END);
3718  file_size = ftell(p_file);
3719  }
3720 
3721  if (copy_params.threads == 0) {
3722  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3723  } else {
3724  max_threads = static_cast<size_t>(copy_params.threads);
3725  }
3726 
3727  // deal with small files
3728  size_t alloc_size = copy_params.buffer_size;
3729  if (!decompressed && file_size < alloc_size) {
3730  alloc_size = file_size;
3731  }
3732 
3733  for (size_t i = 0; i < max_threads; i++) {
3734  import_buffers_vec.emplace_back();
3735  for (const auto cd : loader->get_column_descs()) {
3736  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3737  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3738  }
3739  }
3740 
3741  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3742  size_t current_pos = 0;
3743  size_t end_pos;
3744  bool eof_reached = false;
3745  size_t begin_pos = 0;
3746 
3747  (void)fseek(p_file, current_pos, SEEK_SET);
3748  size_t size =
3749  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3750 
3751  // make render group analyzers for each poly column
3752  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3753  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3754  loader->getTableDesc()->tableId, false, false, false);
3755  for (auto cd : columnDescriptors) {
3756  SQLTypes ct = cd->columnType.get_type();
3757  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3758  auto rga = std::make_shared<RenderGroupAnalyzer>();
3759  rga->seedFromExistingTableContents(loader, cd->columnName);
3760  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3761  }
3762  }
3763 
3764  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3765  loader->getTableDesc()->tableId};
3766  auto start_epoch = loader->getTableEpoch();
3767  {
3768  std::list<std::future<ImportStatus>> threads;
3769 
3770  // use a stack to track thread_ids which must not overlap among threads
3771  // because thread_id is used to index import_buffers_vec[]
3772  std::stack<size_t> stack_thread_ids;
3773  for (size_t i = 0; i < max_threads; i++) {
3774  stack_thread_ids.push(i);
3775  }
3776 
3777  size_t first_row_index_this_buffer = 0;
3778 
3779  while (size > 0) {
3780  CHECK(scratch_buffer);
3781  if (eof_reached) {
3782  end_pos = size;
3783  } else {
3784  end_pos = find_end(scratch_buffer.get(), size, copy_params);
3785  }
3786  // unput residual
3787  int nresidual = size - end_pos;
3788  std::unique_ptr<char[]> unbuf;
3789  if (nresidual > 0) {
3790  unbuf = std::make_unique<char[]>(nresidual);
3791  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3792  }
3793 
3794  // added for true row index on error
3795  unsigned int num_rows_this_buffer = 0;
3796  {
3797  // we could multi-thread this, but not worth it
3798  // additional cost here is ~1.4ms per chunk and
3799  // probably free because this thread will spend
3800  // most of its time waiting for the child threads
3801  char* p = scratch_buffer.get() + begin_pos;
3802  char* pend = scratch_buffer.get() + end_pos;
3803  char d = copy_params.line_delim;
3804  while (p < pend) {
3805  if (*p++ == d) {
3806  num_rows_this_buffer++;
3807  }
3808  }
3809  }
3810 
3811  // get a thread_id not in use
3812  auto thread_id = stack_thread_ids.top();
3813  stack_thread_ids.pop();
3814  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3815 
3816  threads.push_back(std::async(std::launch::async,
3818  thread_id,
3819  this,
3820  std::move(scratch_buffer),
3821  begin_pos,
3822  end_pos,
3823  end_pos,
3824  columnIdToRenderGroupAnalyzerMap,
3825  first_row_index_this_buffer));
3826 
3827  first_row_index_this_buffer += num_rows_this_buffer;
3828 
3829  current_pos += end_pos;
3830  scratch_buffer = std::make_unique<char[]>(alloc_size);
3831  CHECK(scratch_buffer);
3832  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3833  size = nresidual + fread(scratch_buffer.get() + nresidual,
3834  1,
3835  copy_params.buffer_size - nresidual,
3836  p_file);
3837  if (size < copy_params.buffer_size && feof(p_file)) {
3838  eof_reached = true;
3839  }
3840 
3841  begin_pos = 0;
3842 
3843  while (threads.size() > 0) {
3844  int nready = 0;
3845  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3846  it != threads.end();) {
3847  auto& p = *it;
3848  std::chrono::milliseconds span(
3849  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3850  if (p.wait_for(span) == std::future_status::ready) {
3851  auto ret_import_status = p.get();
3852  import_status += ret_import_status;
3853  // sum up current total file offsets
3854  size_t total_file_offset{0};
3855  if (decompressed) {
3856  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3857  for (const auto file_offset : file_offsets) {
3858  total_file_offset += file_offset;
3859  }
3860  }
3861  // estimate number of rows per current total file offset
3862  if (decompressed ? total_file_offset : current_pos) {
3864  (decompressed ? (float)total_file_size / total_file_offset
3865  : (float)file_size / current_pos) *
3867  }
3868  VLOG(3) << "rows_completed " << import_status.rows_completed
3869  << ", rows_estimated " << import_status.rows_estimated
3870  << ", total_file_size " << total_file_size << ", total_file_offset "
3871  << total_file_offset;
3873  // recall thread_id for reuse
3874  stack_thread_ids.push(ret_import_status.thread_id);
3875  threads.erase(it++);
3876  ++nready;
3877  } else {
3878  ++it;
3879  }
3880  }
3881 
3882  if (nready == 0) {
3883  std::this_thread::yield();
3884  }
3885 
3886  // on eof, wait all threads to finish
3887  if (0 == size) {
3888  continue;
3889  }
3890 
3891  // keep reading if any free thread slot
3892  // this is one of the major difference from old threading model !!
3893  if (threads.size() < max_threads) {
3894  break;
3895  }
3896  }
3897 
3899  load_truncated = true;
3900  load_failed = true;
3901  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
3902  break;
3903  }
3904  if (load_failed) {
3905  load_truncated = true;
3906  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
3907  "more details";
3908  break;
3909  }
3910  }
3911 
3912  // join dangling threads in case of LOG(ERROR) above
3913  for (auto& p : threads) {
3914  p.wait();
3915  }
3916  }
3917 
3918  checkpoint(start_epoch);
3919 
3920  // must set import_status.load_truncated before closing this end of pipe
3921  // otherwise, the thread on the other end would throw an unwanted 'write()'
3922  // exception
3923  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3924  import_status.load_truncated = load_truncated;
3925 
3926  fclose(p_file);
3927  p_file = nullptr;
3928  return import_status;
3929 }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
std::vector< size_t > file_offsets
Definition: Importer.h:758
SQLTypes
Definition: sqltypes.h:40
#define LOG(tag)
Definition: Logger.h:182
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params)
Definition: Importer.cpp:2269
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:143
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970
std::string import_id
Definition: Importer.h:966
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:132
std::mutex file_offsets_mutex
Definition: Importer.h:759
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3029
ImportStatus import_status
Definition: Importer.h:755
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1775
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:210
#define VLOG(n)
Definition: Logger.h:277
const std::string file_path
Definition: Importer.h:753
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ importGDAL()

ImportStatus Importer_NS::Importer::importGDAL ( std::map< std::string, std::string >  colname_to_src)

Definition at line 4538 of file Importer.cpp.

References CHECK, CHECK_EQ, checkpoint(), Importer_NS::DataStreamSink::copy_params, logger::ERROR, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_shapefile(), kMULTIPOLYGON, kPOLYGON, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, Importer_NS::CopyParams::max_reject, max_threads, openGDALDataset(), Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, set_import_status(), and Importer_NS::CopyParams::threads.

Referenced by Importer_NS::ImportDriver::importGeoTable().

4539  {
4540  // initial status
4541  bool load_truncated = false;
4543 
4545  if (poDS == nullptr) {
4546  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4547  file_path);
4548  }
4549 
4550  OGRLayer& layer =
4552 
4553  // get the number of features in this layer
4554  size_t numFeatures = layer.GetFeatureCount();
4555 
4556  // build map of metadata field (additional columns) name to index
4557  // use shared_ptr since we need to pass it to the worker
4558  FieldNameToIndexMapType fieldNameToIndexMap;
4559  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4560  CHECK(poFDefn);
4561  size_t numFields = poFDefn->GetFieldCount();
4562  for (size_t iField = 0; iField < numFields; iField++) {
4563  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4564  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4565  }
4566 
4567  // the geographic spatial reference we want to put everything in
4568  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4569  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4570 
4571 #if GDAL_VERSION_MAJOR >= 3
4572  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4573  // this results in X and Y being transposed for angle-based
4574  // coordinate systems. This restores the previous behavior.
4575  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4576 #endif
4577 
4578 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4579  // just one "thread"
4580  size_t max_threads = 1;
4581 #else
4582  // how many threads to use
4583  size_t max_threads = 0;
4584  if (copy_params.threads == 0) {
4585  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4586  } else {
4587  max_threads = copy_params.threads;
4588  }
4589 #endif
4590 
4591  // make an import buffer for each thread
4592  CHECK_EQ(import_buffers_vec.size(), 0u);
4593  import_buffers_vec.resize(max_threads);
4594  for (size_t i = 0; i < max_threads; i++) {
4595  for (const auto cd : loader->get_column_descs()) {
4596  import_buffers_vec[i].emplace_back(
4597  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4598  }
4599  }
4600 
4601  // make render group analyzers for each poly column
4602  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4603  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4604  loader->getTableDesc()->tableId, false, false, false);
4605  for (auto cd : columnDescriptors) {
4606  SQLTypes ct = cd->columnType.get_type();
4607  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4608  auto rga = std::make_shared<RenderGroupAnalyzer>();
4609  rga->seedFromExistingTableContents(loader, cd->columnName);
4610  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4611  }
4612  }
4613 
4614 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4615  // threads
4616  std::list<std::future<ImportStatus>> threads;
4617 
4618  // use a stack to track thread_ids which must not overlap among threads
4619  // because thread_id is used to index import_buffers_vec[]
4620  std::stack<size_t> stack_thread_ids;
4621  for (size_t i = 0; i < max_threads; i++) {
4622  stack_thread_ids.push(i);
4623  }
4624 #endif
4625 
4626  // checkpoint the table
4627  auto start_epoch = loader->getTableEpoch();
4628 
4629  // reset the layer
4630  layer.ResetReading();
4631 
4632  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4633 
4634  // make a features buffer for each thread
4635  std::vector<FeaturePtrVector> features(max_threads);
4636 
4637  // for each feature...
4638  size_t firstFeatureThisChunk = 0;
4639  while (firstFeatureThisChunk < numFeatures) {
4640  // how many features this chunk
4641  size_t numFeaturesThisChunk =
4642  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4643 
4644 // get a thread_id not in use
4645 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4646  size_t thread_id = 0;
4647 #else
4648  auto thread_id = stack_thread_ids.top();
4649  stack_thread_ids.pop();
4650  CHECK(thread_id < max_threads);
4651 #endif
4652 
4653  // fill features buffer for new thread
4654  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4655  features[thread_id].emplace_back(layer.GetNextFeature());
4656  }
4657 
4658 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4659  // call worker function directly
4660  auto ret_import_status = import_thread_shapefile(0,
4661  this,
4662  poGeographicSR.get(),
4663  std::move(features[thread_id]),
4664  firstFeatureThisChunk,
4665  numFeaturesThisChunk,
4666  fieldNameToIndexMap,
4667  columnNameToSourceNameMap,
4668  columnIdToRenderGroupAnalyzerMap);
4669  import_status += ret_import_status;
4670  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4671  import_status.rows_completed;
4672  set_import_status(import_id, import_status);
4673 #else
4674  // fire up that thread to import this geometry
4675  threads.push_back(std::async(std::launch::async,
4677  thread_id,
4678  this,
4679  poGeographicSR.get(),
4680  std::move(features[thread_id]),
4681  firstFeatureThisChunk,
4682  numFeaturesThisChunk,
4683  fieldNameToIndexMap,
4684  columnNameToSourceNameMap,
4685  columnIdToRenderGroupAnalyzerMap));
4686 
4687  // let the threads run
4688  while (threads.size() > 0) {
4689  int nready = 0;
4690  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4691  it != threads.end();) {
4692  auto& p = *it;
4693  std::chrono::milliseconds span(
4694  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4695  if (p.wait_for(span) == std::future_status::ready) {
4696  auto ret_import_status = p.get();
4697  import_status += ret_import_status;
4698  import_status.rows_estimated =
4699  ((float)firstFeatureThisChunk / (float)numFeatures) *
4700  import_status.rows_completed;
4701  set_import_status(import_id, import_status);
4702 
4703  // recall thread_id for reuse
4704  stack_thread_ids.push(ret_import_status.thread_id);
4705 
4706  threads.erase(it++);
4707  ++nready;
4708  } else {
4709  ++it;
4710  }
4711  }
4712 
4713  if (nready == 0) {
4714  std::this_thread::yield();
4715  }
4716 
4717  // keep reading if any free thread slot
4718  // this is one of the major difference from old threading model !!
4719  if (threads.size() < max_threads) {
4720  break;
4721  }
4722  }
4723 #endif
4724 
4725  // out of rows?
4726  if (import_status.rows_rejected > copy_params.max_reject) {
4727  load_truncated = true;
4728  load_failed = true;
4729  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4730  break;
4731  }
4732 
4733  // failed?
4734  if (load_failed) {
4735  load_truncated = true;
4736  LOG(ERROR)
4737  << "A call to the Loader::load failed, Please review the logs for more details";
4738  break;
4739  }
4740 
4741  firstFeatureThisChunk += numFeaturesThisChunk;
4742  }
4743 
4744 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4745  // wait for any remaining threads
4746  if (threads.size()) {
4747  for (auto& p : threads) {
4748  // wait for the thread
4749  p.wait();
4750  // get the result and update the final import status
4751  auto ret_import_status = p.get();
4752  import_status += ret_import_status;
4753  import_status.rows_estimated = import_status.rows_completed;
4754  set_import_status(import_id, import_status);
4755  }
4756  }
4757 #endif
4758 
4759  checkpoint(start_epoch);
4760 
4761  // must set import_status.load_truncated before closing this end of pipe
4762  // otherwise, the thread on the other end would throw an unwanted 'write()'
4763  // exception
4764  import_status.load_truncated = load_truncated;
4765  return import_status;
4766 }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
#define CHECK_EQ(x, y)
Definition: Logger.h:195
SQLTypes
Definition: sqltypes.h:40
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:109
#define LOG(tag)
Definition: Logger.h:182
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:90
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4121
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970
std::string import_id
Definition: Importer.h:966
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2021
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:132
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4088
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3029
std::string geo_layer_name
Definition: Importer.h:131
ImportStatus import_status
Definition: Importer.h:755
#define CHECK(condition)
Definition: Logger.h:187
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:129
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:210
const std::string file_path
Definition: Importer.h:753
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initGDAL()

void Importer_NS::Importer::initGDAL ( )
staticprivate

Definition at line 3971 of file Importer.cpp.

References Importer_NS::GDALErrorHandler(), logger::INFO, LOG, and mapd_root_abs_path().

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

3971  {
3972  // this should not be called from multiple threads, but...
3973  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
3974  // init under mutex
3975  static bool gdal_initialized = false;
3976  if (!gdal_initialized) {
3977  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
3978  setenv("GDAL_DATA",
3979  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
3980  true);
3981 
3982  // configure SSL certificate path (per S3Archive::init_for_read)
3983  // in a production build, GDAL and Curl will have been built on
3984  // CentOS, so the baked-in system path will be wrong for Ubuntu
3985  // and other Linux distros. Unless the user is deliberately
3986  // overriding it by setting SSL_CERT_FILE explicitly in the server
3987  // environment, we set it to whichever CA bundle directory exists
3988  // on the machine we're running on
3989  std::list<std::string> v_known_ca_paths({
3990  "/etc/ssl/certs/ca-certificates.crt",
3991  "/etc/pki/tls/certs/ca-bundle.crt",
3992  "/usr/share/ssl/certs/ca-bundle.crt",
3993  "/usr/local/share/certs/ca-root.crt",
3994  "/etc/ssl/cert.pem",
3995  "/etc/ssl/ca-bundle.pem",
3996  });
3997  for (const auto& known_ca_path : v_known_ca_paths) {
3998  if (boost::filesystem::exists(known_ca_path)) {
3999  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4000  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4001  break;
4002  }
4003  }
4004 
4005  GDALAllRegister();
4006  OGRRegisterAll();
4007  CPLSetErrorHandler(*GDALErrorHandler);
4008  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4009  gdal_initialized = true;
4010  }
4011 }
#define LOG(tag)
Definition: Logger.h:182
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
static std::mutex init_gdal_mutex
Definition: Importer.h:973
void GDALErrorHandler(CPLErr eErrClass, int err_no, const char *msg)
Definition: Importer.cpp:3948
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ load()

void Importer_NS::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count 
)

Definition at line 3022 of file Importer.cpp.

References Importer_NS::DataStreamSink::load_failed, and loader.

Referenced by Importer_NS::DataStreamSink::archivePlumber(), Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

3023  {
3024  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3025  load_failed = true;
3026  }
3027 }
std::unique_ptr< Loader > loader
Definition: Importer.h:971
+ Here is the caller graph for this function:

◆ openGDALDataset()

OGRDataSource * Importer_NS::Importer::openGDALDataset ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4088 of file Importer.cpp.

References logger::ERROR, logger::INFO, initGDAL(), LOG, and setGDALAuthorizationTokens().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptors(), importGDAL(), and readMetadataSampleGDAL().

4089  {
4090  // lazy init GDAL
4091  initGDAL();
4092 
4093  // set authorization tokens
4094  setGDALAuthorizationTokens(copy_params);
4095 
4096  // open the file
4097  OGRDataSource* poDS;
4098 #if GDAL_VERSION_MAJOR == 1
4099  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4100 #else
4101  poDS = (OGRDataSource*)GDALOpenEx(
4102  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4103  if (poDS == nullptr) {
4104  poDS = (OGRDataSource*)GDALOpenEx(
4105  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4106  if (poDS) {
4107  LOG(INFO) << "openGDALDataset had to open as read-only";
4108  }
4109  }
4110 #endif
4111  if (poDS == nullptr) {
4112  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4113  }
4114  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4115  // in a memory leak if GDAL successfully opened the input dataset.
4116  return poDS;
4117 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4018
#define LOG(tag)
Definition: Logger.h:182
static void initGDAL()
Definition: Importer.cpp:3971
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ readMetadataSampleGDAL()

void Importer_NS::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4144 of file Importer.cpp.

References CHECK, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), and openGDALDataset().

Referenced by MapDHandler::detect_column_types().

4149  {
4150  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4151  if (poDS == nullptr) {
4152  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4153  file_name);
4154  }
4155 
4156  OGRLayer& layer =
4157  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4158 
4159  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4160  CHECK(poFDefn);
4161 
4162  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4163  auto nFeats = layer.GetFeatureCount();
4164  size_t numFeatures =
4165  std::max(static_cast<decltype(nFeats)>(0),
4166  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4167  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4168  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4169  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4170  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4171  }
4172  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4173  layer.ResetReading();
4174  size_t iFeature = 0;
4175  while (iFeature < numFeatures) {
4176  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4177  if (!poFeature) {
4178  break;
4179  }
4180 
4181  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4182  if (poGeometry != nullptr) {
4183  // validate geom type (again?)
4184  switch (wkbFlatten(poGeometry->getGeometryType())) {
4185  case wkbPoint:
4186  case wkbLineString:
4187  case wkbPolygon:
4188  case wkbMultiPolygon:
4189  break;
4190  default:
4191  throw std::runtime_error("Unsupported geometry type: " +
4192  std::string(poGeometry->getGeometryName()));
4193  }
4194 
4195  // populate metadata for regular fields
4196  for (auto i : metadata) {
4197  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4198  if (iField >= 0) { // geom is -1
4199  metadata[i.first].at(iFeature) =
4200  std::string(poFeature->GetFieldAsString(iField));
4201  }
4202  }
4203 
4204  // populate metadata for geo column with WKT string
4205  char* wkts = nullptr;
4206  poGeometry->exportToWkt(&wkts);
4207  CHECK(wkts);
4208  metadata[geo_column_name].at(iFeature) = wkts;
4209  CPLFree(wkts);
4210  }
4211  iFeature++;
4212  }
4213 }
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:90
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:99
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4121
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4088
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_geo_physical_import_buffer()

void Importer_NS::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1581 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Importer_NS::compress_coords(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), kLINESTRING, kMULTIPOLYGON, kPOLYGON, and ColumnDescriptor::tableId.

Referenced by Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), Importer_NS::import_thread_delimited(), and MapDHandler::load_table().

1591  {
1592  const auto col_ti = cd->columnType;
1593  const auto col_type = col_ti.get_type();
1594  auto columnId = cd->columnId;
1595  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1596  std::vector<TDatum> td_coords_data;
1597  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1598  for (auto cc : compressed_coords) {
1599  TDatum td_byte;
1600  td_byte.val.int_val = cc;
1601  td_coords_data.push_back(td_byte);
1602  }
1603  TDatum tdd_coords;
1604  tdd_coords.val.arr_val = td_coords_data;
1605  tdd_coords.is_null = false;
1606  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1607 
1608  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1609  // Create ring_sizes array value and add it to the physical column
1610  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1611  std::vector<TDatum> td_ring_sizes;
1612  for (auto ring_size : ring_sizes) {
1613  TDatum td_ring_size;
1614  td_ring_size.val.int_val = ring_size;
1615  td_ring_sizes.push_back(td_ring_size);
1616  }
1617  TDatum tdd_ring_sizes;
1618  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1619  tdd_ring_sizes.is_null = false;
1620  import_buffers[col_idx++]->add_value(
1621  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1622  }
1623 
1624  if (col_type == kMULTIPOLYGON) {
1625  // Create poly_rings array value and add it to the physical column
1626  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1627  std::vector<TDatum> td_poly_rings;
1628  for (auto num_rings : poly_rings) {
1629  TDatum td_num_rings;
1630  td_num_rings.val.int_val = num_rings;
1631  td_poly_rings.push_back(td_num_rings);
1632  }
1633  TDatum tdd_poly_rings;
1634  tdd_poly_rings.val.arr_val = td_poly_rings;
1635  tdd_poly_rings.is_null = false;
1636  import_buffers[col_idx++]->add_value(
1637  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1638  }
1639 
1640  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1641  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  std::vector<TDatum> td_bounds_data;
1643  for (auto b : bounds) {
1644  TDatum td_double;
1645  td_double.val.real_val = b;
1646  td_bounds_data.push_back(td_double);
1647  }
1648  TDatum tdd_bounds;
1649  tdd_bounds.val.arr_val = td_bounds_data;
1650  tdd_bounds.is_null = false;
1651  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1652  }
1653 
1654  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1655  // Create render_group value and add it to the physical column
1656  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1657  TDatum td_render_group;
1658  td_render_group.val.int_val = render_group;
1659  td_render_group.is_null = false;
1660  import_buffers[col_idx++]->add_value(
1661  cd_render_group, td_render_group, false, replicate_count);
1662  }
1663 }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:319
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1546
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_geo_physical_import_buffer_columnar()

void Importer_NS::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1665 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Importer_NS::compress_coords(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), kLINESTRING, kMULTIPOLYGON, kPOLYGON, and ColumnDescriptor::tableId.

Referenced by MapDHandler::load_table_binary_columnar().

1675  {
1676  const auto col_ti = cd->columnType;
1677  const auto col_type = col_ti.get_type();
1678  auto columnId = cd->columnId;
1679 
1680  auto coords_row_count = coords_column.size();
1681  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1682  for (auto coords : coords_column) {
1683  std::vector<TDatum> td_coords_data;
1684  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1685  for (auto cc : compressed_coords) {
1686  TDatum td_byte;
1687  td_byte.val.int_val = cc;
1688  td_coords_data.push_back(td_byte);
1689  }
1690  TDatum tdd_coords;
1691  tdd_coords.val.arr_val = td_coords_data;
1692  tdd_coords.is_null = false;
1693  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1694  }
1695  col_idx++;
1696 
1697  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1698  if (ring_sizes_column.size() != coords_row_count) {
1699  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1700  }
1701  // Create ring_sizes array value and add it to the physical column
1702  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  for (auto ring_sizes : ring_sizes_column) {
1704  std::vector<TDatum> td_ring_sizes;
1705  for (auto ring_size : ring_sizes) {
1706  TDatum td_ring_size;
1707  td_ring_size.val.int_val = ring_size;
1708  td_ring_sizes.push_back(td_ring_size);
1709  }
1710  TDatum tdd_ring_sizes;
1711  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1712  tdd_ring_sizes.is_null = false;
1713  import_buffers[col_idx]->add_value(
1714  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1715  }
1716  col_idx++;
1717  }
1718 
1719  if (col_type == kMULTIPOLYGON) {
1720  if (poly_rings_column.size() != coords_row_count) {
1721  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1722  }
1723  // Create poly_rings array value and add it to the physical column
1724  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1725  for (auto poly_rings : poly_rings_column) {
1726  std::vector<TDatum> td_poly_rings;
1727  for (auto num_rings : poly_rings) {
1728  TDatum td_num_rings;
1729  td_num_rings.val.int_val = num_rings;
1730  td_poly_rings.push_back(td_num_rings);
1731  }
1732  TDatum tdd_poly_rings;
1733  tdd_poly_rings.val.arr_val = td_poly_rings;
1734  tdd_poly_rings.is_null = false;
1735  import_buffers[col_idx]->add_value(
1736  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1737  }
1738  col_idx++;
1739  }
1740 
1741  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1742  if (bounds_column.size() != coords_row_count) {
1743  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1744  }
1745  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto bounds : bounds_column) {
1747  std::vector<TDatum> td_bounds_data;
1748  for (auto b : bounds) {
1749  TDatum td_double;
1750  td_double.val.real_val = b;
1751  td_bounds_data.push_back(td_double);
1752  }
1753  TDatum tdd_bounds;
1754  tdd_bounds.val.arr_val = td_bounds_data;
1755  tdd_bounds.is_null = false;
1756  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1757  }
1758  col_idx++;
1759  }
1760 
1761  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1762  // Create render_group value and add it to the physical column
1763  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1764  TDatum td_render_group;
1765  td_render_group.val.int_val = render_group;
1766  td_render_group.is_null = false;
1767  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1768  import_buffers[col_idx]->add_value(
1769  cd_render_group, td_render_group, false, replicate_count);
1770  }
1771  col_idx++;
1772  }
1773 }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:319
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1546
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:187
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_import_status()

void Importer_NS::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 210 of file Importer.cpp.

References Importer_NS::ImportStatus::elapsed, Importer_NS::ImportStatus::end, import_id, and Importer_NS::ImportStatus::start.

Referenced by importDelimited(), and importGDAL().

210  {
211  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
212  is.end = std::chrono::steady_clock::now();
213  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
215 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:144
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:143
std::string import_id
Definition: Importer.h:966
+ Here is the caller graph for this function:

◆ setGDALAuthorizationTokens()

void Importer_NS::Importer::setGDALAuthorizationTokens ( const CopyParams copy_params)
staticprivate

Definition at line 4018 of file Importer.cpp.

References logger::INFO, LOG, Importer_NS::CopyParams::s3_access_key, Importer_NS::CopyParams::s3_endpoint, Importer_NS::CopyParams::s3_region, and Importer_NS::CopyParams::s3_secret_key.

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4018  {
4019  // for now we only support S3
4020  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4021  // only set if non-empty to allow GDAL defaults to persist
4022  // explicitly clear if empty to revert to default and not reuse a previous session's
4023  // keys
4024  if (copy_params.s3_region.size()) {
4025 #if DEBUG_AWS_AUTHENTICATION
4026  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4027 #endif
4028  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4029  } else {
4030 #if DEBUG_AWS_AUTHENTICATION
4031  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4032 #endif
4033  CPLSetConfigOption("AWS_REGION", nullptr);
4034  }
4035  if (copy_params.s3_endpoint.size()) {
4036 #if DEBUG_AWS_AUTHENTICATION
4037  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4038 #endif
4039  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4040  } else {
4041 #if DEBUG_AWS_AUTHENTICATION
4042  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4043 #endif
4044  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4045  }
4046  if (copy_params.s3_access_key.size()) {
4047 #if DEBUG_AWS_AUTHENTICATION
4048  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4049  << "'";
4050 #endif
4051  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4052  } else {
4053 #if DEBUG_AWS_AUTHENTICATION
4054  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4055 #endif
4056  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4057  }
4058  if (copy_params.s3_secret_key.size()) {
4059 #if DEBUG_AWS_AUTHENTICATION
4060  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4061  << "'";
4062 #endif
4063  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4064  } else {
4065 #if DEBUG_AWS_AUTHENTICATION
4066  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4067 #endif
4068  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4069  }
4070 
4071 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4072  // if we haven't set keys, we need to disable signed access
4073  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4074 #if DEBUG_AWS_AUTHENTICATION
4075  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4076 #endif
4077  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4078  } else {
4079 #if DEBUG_AWS_AUTHENTICATION
4080  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4081 #endif
4082  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4083  }
4084 #endif
4085 }
#define LOG(tag)
Definition: Logger.h:182
+ Here is the caller graph for this function:

Member Data Documentation

◆ buffer

◆ file_size

size_t Importer_NS::Importer::file_size
private

Definition at line 967 of file Importer.h.

Referenced by importDelimited(), and Importer().

◆ import_buffers_vec

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > Importer_NS::Importer::import_buffers_vec
private

◆ import_id

std::string Importer_NS::Importer::import_id
private

Definition at line 966 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDAL(), and set_import_status().

◆ init_gdal_mutex

std::mutex Importer_NS::Importer::init_gdal_mutex
staticprivate

Definition at line 973 of file Importer.h.

Referenced by Importer_NS::GDALErrorHandler().

◆ is_array_a

std::unique_ptr<bool[]> Importer_NS::Importer::is_array_a
private

Definition at line 972 of file Importer.h.

Referenced by Importer().

◆ loader

std::unique_ptr<Loader> Importer_NS::Importer::loader
private

◆ max_threads

size_t Importer_NS::Importer::max_threads
private

The documentation for this class was generated from the following files: