OmniSciDB  29e35f4d58
Importer_NS::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Importer:
+ Collaboration diagram for Importer_NS::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import ()
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed) override
 
ImportStatus importGDAL (std::map< std::string, std::string > colname_to_src)
 
const CopyParamsget_copy_params () const
 
const std::list< const ColumnDescriptor * > & get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
 
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec ()
 
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const int32_t start_epoch)
 
auto getLoader () const
 
- Public Member Functions inherited from Importer_NS::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths)
 

Static Public Member Functions

static bool hasGDALLibKML ()
 
static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list< ColumnDescriptorgdalToColumnDescriptors (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector< GeoFileLayerInfogdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static bool gdalSupportsNetworkFileAccess ()
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
 

Static Private Member Functions

static void initGDAL ()
 
static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static OGRDataSource * openGDALDataset (const std::string &fileName, const CopyParams &copy_params)
 
static void setGDALAuthorizationTokens (const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from Importer_NS::DataStreamSink
ImportStatus archivePlumber ()
 
- Protected Attributes inherited from Importer_NS::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status
 
bool load_failed = false
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 736 of file Importer.h.

Member Enumeration Documentation

◆ GeoFileLayerContents

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 783 of file Importer.h.

783 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

◆ Importer() [1/2]

Importer_NS::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 149 of file Importer.cpp.

153  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:149

◆ Importer() [2/2]

Importer_NS::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 155 of file Importer.cpp.

References buffer, Importer_NS::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and Importer_NS::DataStreamSink::p_file.

156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:834
std::string import_id
Definition: Importer.h:828
const std::string file_path
Definition: Importer.h:654

◆ ~Importer()

Importer_NS::Importer::~Importer ( )
override

Definition at line 196 of file Importer.cpp.

References buffer, and Importer_NS::DataStreamSink::p_file.

196  {
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }

Member Function Documentation

◆ checkpoint()

void Importer_NS::Importer::checkpoint ( const int32_t  start_epoch)

Definition at line 3121 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), import_buffers_vec, logger::INFO, Importer_NS::DataStreamSink::load_failed, loader, and LOG.

Referenced by importDelimited(), and importGDAL().

3121  {
3122  if (load_failed) {
3123  // rollback to starting epoch - undo all the added records
3124  loader->setTableEpoch(start_epoch);
3125  } else {
3126  loader->checkpoint();
3127  }
3128 
3129  if (loader->getTableDesc()->persistenceLevel ==
3130  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3131  auto ms = measure<>::execution([&]() {
3132  if (!load_failed) {
3133  for (auto& p : import_buffers_vec[0]) {
3134  if (!p->stringDictCheckpoint()) {
3135  LOG(ERROR) << "Checkpointing Dictionary for Column "
3136  << p->getColumnDesc()->columnName << " failed.";
3137  load_failed = true;
3138  break;
3139  }
3140  }
3141  }
3142  });
3143  if (DEBUG_TIMING) {
3144  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3145  << std::endl;
3146  }
3147  }
3148 }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
#define LOG(tag)
Definition: Logger.h:188
#define DEBUG_TIMING
Definition: Importer.cpp:138
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalFileExists()

bool Importer_NS::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4461 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::check_geospatial_files(), MapDHandler::detect_column_types(), MapDHandler::get_all_files_in_archive(), MapDHandler::get_first_geo_file_in_archive(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4461  {
4462  return gdalStatInternal(path, copy_params, false);
4463 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4428
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalFileOrDirectoryExists()

bool Importer_NS::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4466 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::detect_column_types(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4467  {
4468  return gdalStatInternal(path, copy_params, true);
4469 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4428
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalGetAllFilesInArchive()

std::vector< std::string > Importer_NS::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 4538 of file Importer.cpp.

References Importer_NS::gdalGatherFilesInArchiveRecursive(), initGDAL(), and setGDALAuthorizationTokens().

Referenced by find_first_geo_file_in_archive(), and MapDHandler::get_all_files_in_archive().

4540  {
4541  // lazy init GDAL
4542  initGDAL();
4543 
4544  // set authorization tokens
4546 
4547  // prepare to gather files
4548  std::vector<std::string> files;
4549 
4550  // gather the files recursively
4551  gdalGatherFilesInArchiveRecursive(archive_path, files);
4552 
4553  // convert to relative paths inside archive
4554  for (auto& file : files) {
4555  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4556  }
4557 
4558  // done
4559  return files;
4560 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4091
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4471
static void initGDAL()
Definition: Importer.cpp:4044
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalGetLayersInGeoFile()

std::vector< Importer::GeoFileLayerInfo > Importer_NS::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 4563 of file Importer.cpp.

References CHECK, EMPTY, GEO, initGDAL(), NON_GEO, openGDALDataset(), setGDALAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4565  {
4566  // lazy init GDAL
4567  initGDAL();
4568 
4569  // set authorization tokens
4571 
4572  // prepare to gather layer info
4573  std::vector<GeoFileLayerInfo> layer_info;
4574 
4575  // open the data set
4577  if (poDS == nullptr) {
4578  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4579  file_name);
4580  }
4581 
4582  // enumerate the layers
4583  for (auto&& poLayer : poDS->GetLayers()) {
4585  // prepare to read this layer
4586  poLayer->ResetReading();
4587  // skip layer if empty
4588  if (poLayer->GetFeatureCount() > 0) {
4589  // get first feature
4590  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4591  CHECK(first_feature);
4592  // check feature for geometry
4593  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4594  if (!geometry) {
4595  // layer has no geometry
4596  contents = GeoFileLayerContents::NON_GEO;
4597  } else {
4598  // check the geometry type
4599  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4600  switch (wkbFlatten(geometry_type)) {
4601  case wkbPoint:
4602  case wkbLineString:
4603  case wkbPolygon:
4604  case wkbMultiPolygon:
4605  // layer has supported geo
4606  contents = GeoFileLayerContents::GEO;
4607  break;
4608  default:
4609  // layer has unsupported geometry
4611  break;
4612  }
4613  }
4614  }
4615  // store info for this layer
4616  layer_info.emplace_back(poLayer->GetName(), contents);
4617  }
4618 
4619  // done
4620  return layer_info;
4621 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4091
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4161
#define CHECK(condition)
Definition: Logger.h:193
static void initGDAL()
Definition: Importer.cpp:4044
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalStatInternal()

bool Importer_NS::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 4428 of file Importer.cpp.

References initGDAL(), run_benchmark_import::result, and setGDALAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

4430  {
4431  // lazy init GDAL
4432  initGDAL();
4433 
4434  // set authorization tokens
4436 
4437 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4438  // clear GDAL stat cache
4439  // without this, file existence will be cached, even if authentication changes
4440  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4441  VSICurlClearCache();
4442 #endif
4443 
4444  // stat path
4445  VSIStatBufL sb;
4446  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4447  if (result < 0) {
4448  return false;
4449  }
4450 
4451  // exists?
4452  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4453  return true;
4454  } else if (VSI_ISREG(sb.st_mode)) {
4455  return true;
4456  }
4457  return false;
4458 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4091
static void initGDAL()
Definition: Importer.cpp:4044
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ gdalSupportsNetworkFileAccess()

bool Importer_NS::Importer::gdalSupportsNetworkFileAccess ( )
static

Definition at line 4624 of file Importer.cpp.

Referenced by add_vsi_network_prefix().

4624  {
4625 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4626  return true;
4627 #else
4628  return false;
4629 #endif
4630 }
+ Here is the caller graph for this function:

◆ gdalToColumnDescriptors()

const std::list< ColumnDescriptor > Importer_NS::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4338 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, Importer_NS::CopyParams::geo_coords_comp_param, Importer_NS::CopyParams::geo_coords_encoding, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_coords_type, Importer_NS::CopyParams::geo_explode_collections, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, Importer_NS::ogr_to_type(), openGDALDataset(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_fixed_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_input_srid(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_output_srid(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::set_type(), and ColumnDescriptor::sourceName.

Referenced by MapDHandler::detect_column_types(), and Importer_NS::ImportDriver::importGeoTable().

4341  {
4342  std::list<ColumnDescriptor> cds;
4343 
4345  if (poDS == nullptr) {
4346  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4347  file_name);
4348  }
4349 
4350  OGRLayer& layer =
4352 
4353  layer.ResetReading();
4354  // TODO(andrewseidl): support multiple features
4355  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4356  if (poFeature == nullptr) {
4357  throw std::runtime_error("No features found in " + file_name);
4358  }
4359  // get fields as regular columns
4360  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4361  CHECK(poFDefn);
4362  int iField;
4363  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4364  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4365  auto typePair = ogr_to_type(poFieldDefn->GetType());
4366  ColumnDescriptor cd;
4367  cd.columnName = poFieldDefn->GetNameRef();
4368  cd.sourceName = poFieldDefn->GetNameRef();
4369  SQLTypeInfo ti;
4370  if (typePair.second) {
4371  ti.set_type(kARRAY);
4372  ti.set_subtype(typePair.first);
4373  } else {
4374  ti.set_type(typePair.first);
4375  }
4376  if (typePair.first == kTEXT) {
4378  ti.set_comp_param(32);
4379  }
4380  ti.set_fixed_size();
4381  cd.columnType = ti;
4382  cds.push_back(cd);
4383  }
4384  // get geo column, if any
4385  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4386  if (poGeometry) {
4387  ColumnDescriptor cd;
4388  cd.columnName = geo_column_name;
4389  cd.sourceName = geo_column_name;
4390 
4391  // get GDAL type
4392  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4393 
4394  // if exploding, override any collection type to child type
4396  if (ogr_type == wkbMultiPolygon) {
4397  ogr_type = wkbPolygon;
4398  } else if (ogr_type == wkbMultiLineString) {
4399  ogr_type = wkbLineString;
4400  } else if (ogr_type == wkbMultiPoint) {
4401  ogr_type = wkbPoint;
4402  }
4403  }
4404 
4405  // convert to internal type
4406  SQLTypes geoType = ogr_to_type(ogr_type);
4407 
4408  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4410  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4411  }
4412 
4413  // build full internal type
4414  SQLTypeInfo ti;
4415  ti.set_type(geoType);
4421  cd.columnType = ti;
4422 
4423  cds.push_back(cd);
4424  }
4425  return cds;
4426 }
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4288
SQLTypes
Definition: sqltypes.h:41
void set_input_srid(int d)
Definition: sqltypes.h:420
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:416
void set_fixed_size()
Definition: sqltypes.h:425
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
void set_compression(EncodingType c)
Definition: sqltypes.h:426
std::string sourceName
void set_output_srid(int s)
Definition: sqltypes.h:422
EncodingType geo_coords_encoding
Definition: CopyParams.h:73
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:417
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4194
specifies the content in-memory of a row in the column metadata table
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4161
Definition: sqltypes.h:55
std::string geo_layer_name
Definition: CopyParams.h:78
void set_comp_param(int p)
Definition: sqltypes.h:427
#define CHECK(condition)
Definition: Logger.h:193
SQLTypeInfo columnType
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:144
std::string columnName
int32_t geo_coords_comp_param
Definition: CopyParams.h:74
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_column_descs()

const std::list<const ColumnDescriptor*>& Importer_NS::Importer::get_column_descs ( ) const
inline

Definition at line 750 of file Importer.h.

Referenced by Importer_NS::DataStreamSink::archivePlumber(), Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

750  {
751  return loader->get_column_descs();
752  }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
+ Here is the caller graph for this function:

◆ get_copy_params()

const CopyParams& Importer_NS::Importer::get_copy_params ( ) const
inline

Definition at line 749 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

749 { return copy_params; }
+ Here is the caller graph for this function:

◆ get_import_buffers()

std::vector<std::unique_ptr<TypedImportBuffer> >& Importer_NS::Importer::get_import_buffers ( int  i)
inline

Definition at line 758 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

758  {
759  return import_buffers_vec[i];
760  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
+ Here is the caller graph for this function:

◆ get_import_buffers_vec()

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& Importer_NS::Importer::get_import_buffers_vec ( )
inline

Definition at line 755 of file Importer.h.

755  {
756  return import_buffers_vec;
757  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832

◆ get_import_status()

ImportStatus Importer_NS::Importer::get_import_status ( const std::string &  id)
static

Definition at line 208 of file Importer.cpp.

Referenced by MapDHandler::import_table_status().

208  {
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::string import_id
Definition: Importer.h:828
+ Here is the caller graph for this function:

◆ get_is_array()

const bool* Importer_NS::Importer::get_is_array ( ) const
inline

Definition at line 761 of file Importer.h.

Referenced by Importer_NS::import_thread_delimited().

761 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:834
+ Here is the caller graph for this function:

◆ getCatalog()

Catalog_Namespace::Catalog& Importer_NS::Importer::getCatalog ( )
inline

Definition at line 794 of file Importer.h.

Referenced by Importer_NS::Loader::checkpoint(), Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Importer_NS::Loader::getTableEpoch(), Importer_NS::import_thread_delimited(), and Importer_NS::Loader::setTableEpoch().

794 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
+ Here is the caller graph for this function:

◆ getLoader()

auto Importer_NS::Importer::getLoader ( ) const
inline

Definition at line 818 of file Importer.h.

Referenced by Importer_NS::DataStreamSink::archivePlumber().

818 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
+ Here is the caller graph for this function:

◆ hasGDALLibKML()

bool Importer_NS::Importer::hasGDALLibKML ( )
static

Definition at line 4086 of file Importer.cpp.

4086  {
4087  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4088 }

◆ import()

ImportStatus Importer_NS::Importer::import ( )

Definition at line 3792 of file Importer.cpp.

References Importer_NS::DataStreamSink::archivePlumber().

3792  {
3794 }
ImportStatus archivePlumber()
Definition: Importer.cpp:3150
+ Here is the call graph for this function:

◆ importDelimited()

ImportStatus Importer_NS::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed 
)
overridevirtual

Implements Importer_NS::DataStreamSink.

Definition at line 3796 of file Importer.cpp.

References Importer_NS::CopyParams::buffer_size, CHECK, checkpoint(), Importer_NS::DataStreamSink::copy_params, logger::ERROR, Importer_NS::DataStreamSink::file_offsets, Importer_NS::DataStreamSink::file_offsets_mutex, file_size, Importer_NS::DelimitedParserUtils::find_end(), Importer_NS::CopyParams::geo_assign_render_groups, import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_delimited(), kMULTIPOLYGON, kPOLYGON, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, max_threads, Importer_NS::DataStreamSink::p_file, Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, Importer_NS::ImportStatus::rows_rejected, set_import_status(), Importer_NS::CopyParams::threads, Importer_NS::DataStreamSink::total_file_size, and VLOG.

Referenced by Importer_NS::DataStreamSink::import_compressed().

3797  {
3798  bool load_truncated = false;
3800 
3801  if (!p_file) {
3802  p_file = fopen(file_path.c_str(), "rb");
3803  }
3804  if (!p_file) {
3805  throw std::runtime_error("failed to open file '" + file_path +
3806  "': " + strerror(errno));
3807  }
3808 
3809  if (!decompressed) {
3810  (void)fseek(p_file, 0, SEEK_END);
3811  file_size = ftell(p_file);
3812  }
3813 
3814  if (copy_params.threads == 0) {
3815  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3816  } else {
3817  max_threads = static_cast<size_t>(copy_params.threads);
3818  }
3819 
3820  // deal with small files
3821  size_t alloc_size = copy_params.buffer_size;
3822  if (!decompressed && file_size < alloc_size) {
3823  alloc_size = file_size;
3824  }
3825 
3826  for (size_t i = 0; i < max_threads; i++) {
3827  import_buffers_vec.emplace_back();
3828  for (const auto cd : loader->get_column_descs()) {
3829  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3830  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3831  }
3832  }
3833 
3834  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3835  size_t current_pos = 0;
3836  size_t end_pos;
3837  size_t begin_pos = 0;
3838 
3839  (void)fseek(p_file, current_pos, SEEK_SET);
3840  size_t size =
3841  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3842 
3843  // make render group analyzers for each poly column
3844  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3846  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3847  loader->getTableDesc()->tableId, false, false, false);
3848  for (auto cd : columnDescriptors) {
3849  SQLTypes ct = cd->columnType.get_type();
3850  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3851  auto rga = std::make_shared<RenderGroupAnalyzer>();
3852  rga->seedFromExistingTableContents(loader, cd->columnName);
3853  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3854  }
3855  }
3856  }
3857 
3858  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3859  loader->getTableDesc()->tableId};
3860  auto start_epoch = loader->getTableEpoch();
3861  {
3862  std::list<std::future<ImportStatus>> threads;
3863 
3864  // use a stack to track thread_ids which must not overlap among threads
3865  // because thread_id is used to index import_buffers_vec[]
3866  std::stack<size_t> stack_thread_ids;
3867  for (size_t i = 0; i < max_threads; i++) {
3868  stack_thread_ids.push(i);
3869  }
3870  // added for true row index on error
3871  size_t first_row_index_this_buffer = 0;
3872 
3873  while (size > 0) {
3874  unsigned int num_rows_this_buffer = 0;
3875  CHECK(scratch_buffer);
3877  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
3878 
3879  // unput residual
3880  int nresidual = size - end_pos;
3881  std::unique_ptr<char[]> unbuf;
3882  if (nresidual > 0) {
3883  unbuf = std::make_unique<char[]>(nresidual);
3884  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3885  }
3886 
3887  // get a thread_id not in use
3888  auto thread_id = stack_thread_ids.top();
3889  stack_thread_ids.pop();
3890  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3891 
3892  threads.push_back(std::async(std::launch::async,
3894  thread_id,
3895  this,
3896  std::move(scratch_buffer),
3897  begin_pos,
3898  end_pos,
3899  end_pos,
3900  columnIdToRenderGroupAnalyzerMap,
3901  first_row_index_this_buffer));
3902 
3903  first_row_index_this_buffer += num_rows_this_buffer;
3904 
3905  current_pos += end_pos;
3906  scratch_buffer = std::make_unique<char[]>(alloc_size);
3907  CHECK(scratch_buffer);
3908  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3909  size = nresidual + fread(scratch_buffer.get() + nresidual,
3910  1,
3911  copy_params.buffer_size - nresidual,
3912  p_file);
3913 
3914  begin_pos = 0;
3915 
3916  while (threads.size() > 0) {
3917  int nready = 0;
3918  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3919  it != threads.end();) {
3920  auto& p = *it;
3921  std::chrono::milliseconds span(
3922  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3923  if (p.wait_for(span) == std::future_status::ready) {
3924  auto ret_import_status = p.get();
3925  import_status += ret_import_status;
3926  // sum up current total file offsets
3927  size_t total_file_offset{0};
3928  if (decompressed) {
3929  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3930  for (const auto file_offset : file_offsets) {
3931  total_file_offset += file_offset;
3932  }
3933  }
3934  // estimate number of rows per current total file offset
3935  if (decompressed ? total_file_offset : current_pos) {
3937  (decompressed ? (float)total_file_size / total_file_offset
3938  : (float)file_size / current_pos) *
3940  }
3941  VLOG(3) << "rows_completed " << import_status.rows_completed
3942  << ", rows_estimated " << import_status.rows_estimated
3943  << ", total_file_size " << total_file_size << ", total_file_offset "
3944  << total_file_offset;
3946  // recall thread_id for reuse
3947  stack_thread_ids.push(ret_import_status.thread_id);
3948  threads.erase(it++);
3949  ++nready;
3950  } else {
3951  ++it;
3952  }
3953  }
3954 
3955  if (nready == 0) {
3956  std::this_thread::yield();
3957  }
3958 
3959  // on eof, wait all threads to finish
3960  if (0 == size) {
3961  continue;
3962  }
3963 
3964  // keep reading if any free thread slot
3965  // this is one of the major difference from old threading model !!
3966  if (threads.size() < max_threads) {
3967  break;
3968  }
3969  }
3970 
3971  if (import_status.rows_rejected > copy_params.max_reject) {
3972  load_truncated = true;
3973  load_failed = true;
3974  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
3975  break;
3976  }
3977  if (load_failed) {
3978  load_truncated = true;
3979  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
3980  "more details";
3981  break;
3982  }
3983  }
3984 
3985  // join dangling threads in case of LOG(ERROR) above
3986  for (auto& p : threads) {
3987  p.wait();
3988  }
3989  }
3990 
3991  checkpoint(start_epoch);
3992 
3993  // must set import_status.load_truncated before closing this end of pipe
3994  // otherwise, the thread on the other end would throw an unwanted 'write()'
3995  // exception
3996  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3997  import_status.load_truncated = load_truncated;
3998 
3999  fclose(p_file);
4000  p_file = nullptr;
4001  return import_status;
4002 }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
std::vector< size_t > file_offsets
Definition: Importer.h:659
SQLTypes
Definition: sqltypes.h:41
#define LOG(tag)
Definition: Logger.h:188
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
std::string import_id
Definition: Importer.h:828
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
std::mutex file_offsets_mutex
Definition: Importer.h:660
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3121
ImportStatus import_status
Definition: Importer.h:656
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1777
#define CHECK(condition)
Definition: Logger.h:193
std::vector< int > ChunkKey
Definition: types.h:35
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
#define VLOG(n)
Definition: Logger.h:283
const std::string file_path
Definition: Importer.h:654
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ importGDAL()

ImportStatus Importer_NS::Importer::importGDAL ( std::map< std::string, std::string >  colname_to_src)

Definition at line 4632 of file Importer.cpp.

References CHECK, CHECK_EQ, checkpoint(), Importer_NS::DataStreamSink::copy_params, logger::ERROR, Importer_NS::CopyParams::geo_assign_render_groups, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_shapefile(), kMULTIPOLYGON, kPOLYGON, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, Importer_NS::CopyParams::max_reject, max_threads, openGDALDataset(), Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, set_import_status(), and Importer_NS::CopyParams::threads.

Referenced by Importer_NS::ImportDriver::importGeoTable().

4633  {
4634  // initial status
4635  bool load_truncated = false;
4637 
4639  if (poDS == nullptr) {
4640  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4641  file_path);
4642  }
4643 
4644  OGRLayer& layer =
4646 
4647  // get the number of features in this layer
4648  size_t numFeatures = layer.GetFeatureCount();
4649 
4650  // build map of metadata field (additional columns) name to index
4651  // use shared_ptr since we need to pass it to the worker
4652  FieldNameToIndexMapType fieldNameToIndexMap;
4653  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4654  CHECK(poFDefn);
4655  size_t numFields = poFDefn->GetFieldCount();
4656  for (size_t iField = 0; iField < numFields; iField++) {
4657  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4658  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4659  }
4660 
4661  // the geographic spatial reference we want to put everything in
4662  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4663  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4664 
4665 #if GDAL_VERSION_MAJOR >= 3
4666  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4667  // this results in X and Y being transposed for angle-based
4668  // coordinate systems. This restores the previous behavior.
4669  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4670 #endif
4671 
4672 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4673  // just one "thread"
4674  size_t max_threads = 1;
4675 #else
4676  // how many threads to use
4677  size_t max_threads = 0;
4678  if (copy_params.threads == 0) {
4679  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4680  } else {
4681  max_threads = copy_params.threads;
4682  }
4683 #endif
4684 
4685  // make an import buffer for each thread
4686  CHECK_EQ(import_buffers_vec.size(), 0u);
4687  import_buffers_vec.resize(max_threads);
4688  for (size_t i = 0; i < max_threads; i++) {
4689  for (const auto cd : loader->get_column_descs()) {
4690  import_buffers_vec[i].emplace_back(
4691  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4692  }
4693  }
4694 
4695  // make render group analyzers for each poly column
4696  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4698  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4699  loader->getTableDesc()->tableId, false, false, false);
4700  for (auto cd : columnDescriptors) {
4701  SQLTypes ct = cd->columnType.get_type();
4702  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4703  auto rga = std::make_shared<RenderGroupAnalyzer>();
4704  rga->seedFromExistingTableContents(loader, cd->columnName);
4705  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4706  }
4707  }
4708  }
4709 
4710 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4711  // threads
4712  std::list<std::future<ImportStatus>> threads;
4713 
4714  // use a stack to track thread_ids which must not overlap among threads
4715  // because thread_id is used to index import_buffers_vec[]
4716  std::stack<size_t> stack_thread_ids;
4717  for (size_t i = 0; i < max_threads; i++) {
4718  stack_thread_ids.push(i);
4719  }
4720 #endif
4721 
4722  // checkpoint the table
4723  auto start_epoch = loader->getTableEpoch();
4724 
4725  // reset the layer
4726  layer.ResetReading();
4727 
4728  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4729 
4730  // make a features buffer for each thread
4731  std::vector<FeaturePtrVector> features(max_threads);
4732 
4733  // for each feature...
4734  size_t firstFeatureThisChunk = 0;
4735  while (firstFeatureThisChunk < numFeatures) {
4736  // how many features this chunk
4737  size_t numFeaturesThisChunk =
4738  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4739 
4740 // get a thread_id not in use
4741 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4742  size_t thread_id = 0;
4743 #else
4744  auto thread_id = stack_thread_ids.top();
4745  stack_thread_ids.pop();
4746  CHECK(thread_id < max_threads);
4747 #endif
4748 
4749  // fill features buffer for new thread
4750  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4751  features[thread_id].emplace_back(layer.GetNextFeature());
4752  }
4753 
4754 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4755  // call worker function directly
4756  auto ret_import_status = import_thread_shapefile(0,
4757  this,
4758  poGeographicSR.get(),
4759  std::move(features[thread_id]),
4760  firstFeatureThisChunk,
4761  numFeaturesThisChunk,
4762  fieldNameToIndexMap,
4763  columnNameToSourceNameMap,
4764  columnIdToRenderGroupAnalyzerMap);
4765  import_status += ret_import_status;
4766  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4767  import_status.rows_completed;
4768  set_import_status(import_id, import_status);
4769 #else
4770  // fire up that thread to import this geometry
4771  threads.push_back(std::async(std::launch::async,
4773  thread_id,
4774  this,
4775  poGeographicSR.get(),
4776  std::move(features[thread_id]),
4777  firstFeatureThisChunk,
4778  numFeaturesThisChunk,
4779  fieldNameToIndexMap,
4780  columnNameToSourceNameMap,
4781  columnIdToRenderGroupAnalyzerMap));
4782 
4783  // let the threads run
4784  while (threads.size() > 0) {
4785  int nready = 0;
4786  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4787  it != threads.end();) {
4788  auto& p = *it;
4789  std::chrono::milliseconds span(
4790  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4791  if (p.wait_for(span) == std::future_status::ready) {
4792  auto ret_import_status = p.get();
4793  import_status += ret_import_status;
4794  import_status.rows_estimated =
4795  ((float)firstFeatureThisChunk / (float)numFeatures) *
4796  import_status.rows_completed;
4797  set_import_status(import_id, import_status);
4798 
4799  // recall thread_id for reuse
4800  stack_thread_ids.push(ret_import_status.thread_id);
4801 
4802  threads.erase(it++);
4803  ++nready;
4804  } else {
4805  ++it;
4806  }
4807  }
4808 
4809  if (nready == 0) {
4810  std::this_thread::yield();
4811  }
4812 
4813  // keep reading if any free thread slot
4814  // this is one of the major difference from old threading model !!
4815  if (threads.size() < max_threads) {
4816  break;
4817  }
4818  }
4819 #endif
4820 
4821  // out of rows?
4822  if (import_status.rows_rejected > copy_params.max_reject) {
4823  load_truncated = true;
4824  load_failed = true;
4825  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4826  break;
4827  }
4828 
4829  // failed?
4830  if (load_failed) {
4831  load_truncated = true;
4832  LOG(ERROR)
4833  << "A call to the Loader::load failed, Please review the logs for more details";
4834  break;
4835  }
4836 
4837  firstFeatureThisChunk += numFeaturesThisChunk;
4838  }
4839 
4840 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4841  // wait for any remaining threads
4842  if (threads.size()) {
4843  for (auto& p : threads) {
4844  // wait for the thread
4845  p.wait();
4846  // get the result and update the final import status
4847  auto ret_import_status = p.get();
4848  import_status += ret_import_status;
4849  import_status.rows_estimated = import_status.rows_completed;
4850  set_import_status(import_id, import_status);
4851  }
4852  }
4853 #endif
4854 
4855  checkpoint(start_epoch);
4856 
4857  // must set import_status.load_truncated before closing this end of pipe
4858  // otherwise, the thread on the other end would throw an unwanted 'write()'
4859  // exception
4860  import_status.load_truncated = load_truncated;
4861  return import_status;
4862 }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
#define CHECK_EQ(x, y)
Definition: Logger.h:201
SQLTypes
Definition: sqltypes.h:41
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4194
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
std::string import_id
Definition: Importer.h:828
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2084
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4161
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3121
std::string geo_layer_name
Definition: CopyParams.h:78
ImportStatus import_status
Definition: Importer.h:656
#define CHECK(condition)
Definition: Logger.h:193
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:132
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
const std::string file_path
Definition: Importer.h:654
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initGDAL()

void Importer_NS::Importer::initGDAL ( )
staticprivate

Definition at line 4044 of file Importer.cpp.

References Importer_NS::GDALErrorHandler(), logger::INFO, LOG, and mapd_root_abs_path().

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4044  {
4045  // this should not be called from multiple threads, but...
4046  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
4047  // init under mutex
4048  static bool gdal_initialized = false;
4049  if (!gdal_initialized) {
4050  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
4051  setenv("GDAL_DATA",
4052  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
4053  true);
4054 
4055  // configure SSL certificate path (per S3Archive::init_for_read)
4056  // in a production build, GDAL and Curl will have been built on
4057  // CentOS, so the baked-in system path will be wrong for Ubuntu
4058  // and other Linux distros. Unless the user is deliberately
4059  // overriding it by setting SSL_CERT_FILE explicitly in the server
4060  // environment, we set it to whichever CA bundle directory exists
4061  // on the machine we're running on
4062  std::list<std::string> v_known_ca_paths({
4063  "/etc/ssl/certs/ca-certificates.crt",
4064  "/etc/pki/tls/certs/ca-bundle.crt",
4065  "/usr/share/ssl/certs/ca-bundle.crt",
4066  "/usr/local/share/certs/ca-root.crt",
4067  "/etc/ssl/cert.pem",
4068  "/etc/ssl/ca-bundle.pem",
4069  });
4070  for (const auto& known_ca_path : v_known_ca_paths) {
4071  if (boost::filesystem::exists(known_ca_path)) {
4072  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4073  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4074  break;
4075  }
4076  }
4077 
4078  GDALAllRegister();
4079  OGRRegisterAll();
4080  CPLSetErrorHandler(*GDALErrorHandler);
4081  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4082  gdal_initialized = true;
4083  }
4084 }
#define LOG(tag)
Definition: Logger.h:188
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
static std::mutex init_gdal_mutex
Definition: Importer.h:835
void GDALErrorHandler(CPLErr eErrClass, int err_no, const char *msg)
Definition: Importer.cpp:4021
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ load()

void Importer_NS::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count 
)

Definition at line 3114 of file Importer.cpp.

References Importer_NS::DataStreamSink::load_failed, and loader.

Referenced by Importer_NS::DataStreamSink::archivePlumber(), Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

3115  {
3116  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3117  load_failed = true;
3118  }
3119 }
std::unique_ptr< Loader > loader
Definition: Importer.h:833
+ Here is the caller graph for this function:

◆ openGDALDataset()

OGRDataSource * Importer_NS::Importer::openGDALDataset ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4161 of file Importer.cpp.

References logger::ERROR, logger::INFO, initGDAL(), LOG, and setGDALAuthorizationTokens().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptors(), importGDAL(), and readMetadataSampleGDAL().

4162  {
4163  // lazy init GDAL
4164  initGDAL();
4165 
4166  // set authorization tokens
4168 
4169  // open the file
4170  OGRDataSource* poDS;
4171 #if GDAL_VERSION_MAJOR == 1
4172  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4173 #else
4174  poDS = (OGRDataSource*)GDALOpenEx(
4175  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4176  if (poDS == nullptr) {
4177  poDS = (OGRDataSource*)GDALOpenEx(
4178  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4179  if (poDS) {
4180  LOG(INFO) << "openGDALDataset had to open as read-only";
4181  }
4182  }
4183 #endif
4184  if (poDS == nullptr) {
4185  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4186  }
4187  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4188  // in a memory leak if GDAL successfully opened the input dataset.
4189  return poDS;
4190 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4091
#define LOG(tag)
Definition: Logger.h:188
static void initGDAL()
Definition: Importer.cpp:4044
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ readMetadataSampleGDAL()

void Importer_NS::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4217 of file Importer.cpp.

References CHECK, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), and openGDALDataset().

Referenced by MapDHandler::detect_column_types().

4222  {
4224  if (poDS == nullptr) {
4225  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4226  file_name);
4227  }
4228 
4229  OGRLayer& layer =
4231 
4232  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4233  CHECK(poFDefn);
4234 
4235  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4236  auto nFeats = layer.GetFeatureCount();
4237  size_t numFeatures =
4238  std::max(static_cast<decltype(nFeats)>(0),
4239  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4240  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4241  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4242  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4243  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4244  }
4245  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4246  layer.ResetReading();
4247  size_t iFeature = 0;
4248  while (iFeature < numFeatures) {
4249  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4250  if (!poFeature) {
4251  break;
4252  }
4253 
4254  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4255  if (poGeometry != nullptr) {
4256  // validate geom type (again?)
4257  switch (wkbFlatten(poGeometry->getGeometryType())) {
4258  case wkbPoint:
4259  case wkbLineString:
4260  case wkbPolygon:
4261  case wkbMultiPolygon:
4262  break;
4263  default:
4264  throw std::runtime_error("Unsupported geometry type: " +
4265  std::string(poGeometry->getGeometryName()));
4266  }
4267 
4268  // populate metadata for regular fields
4269  for (auto i : metadata) {
4270  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4271  if (iField >= 0) { // geom is -1
4272  metadata[i.first].at(iFeature) =
4273  std::string(poFeature->GetFieldAsString(iField));
4274  }
4275  }
4276 
4277  // populate metadata for geo column with WKT string
4278  char* wkts = nullptr;
4279  poGeometry->exportToWkt(&wkts);
4280  CHECK(wkts);
4281  metadata[geo_column_name].at(iFeature) = wkts;
4282  CPLFree(wkts);
4283  }
4284  iFeature++;
4285  }
4286 }
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4194
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4161
std::string geo_layer_name
Definition: CopyParams.h:78
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_geo_physical_import_buffer()

void Importer_NS::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1457 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Importer_NS::compress_coords(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), kLINESTRING, kMULTIPOLYGON, kPOLYGON, and ColumnDescriptor::tableId.

Referenced by Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), Importer_NS::import_thread_delimited(), and MapDHandler::load_table().

1467  {
1468  const auto col_ti = cd->columnType;
1469  const auto col_type = col_ti.get_type();
1470  auto columnId = cd->columnId;
1471  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1472  std::vector<TDatum> td_coords_data;
1473  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1474  for (auto cc : compressed_coords) {
1475  TDatum td_byte;
1476  td_byte.val.int_val = cc;
1477  td_coords_data.push_back(td_byte);
1478  }
1479  TDatum tdd_coords;
1480  tdd_coords.val.arr_val = td_coords_data;
1481  tdd_coords.is_null = false;
1482  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1483 
1484  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1485  // Create ring_sizes array value and add it to the physical column
1486  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1487  std::vector<TDatum> td_ring_sizes;
1488  for (auto ring_size : ring_sizes) {
1489  TDatum td_ring_size;
1490  td_ring_size.val.int_val = ring_size;
1491  td_ring_sizes.push_back(td_ring_size);
1492  }
1493  TDatum tdd_ring_sizes;
1494  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1495  tdd_ring_sizes.is_null = false;
1496  import_buffers[col_idx++]->add_value(
1497  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1498  }
1499 
1500  if (col_type == kMULTIPOLYGON) {
1501  // Create poly_rings array value and add it to the physical column
1502  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1503  std::vector<TDatum> td_poly_rings;
1504  for (auto num_rings : poly_rings) {
1505  TDatum td_num_rings;
1506  td_num_rings.val.int_val = num_rings;
1507  td_poly_rings.push_back(td_num_rings);
1508  }
1509  TDatum tdd_poly_rings;
1510  tdd_poly_rings.val.arr_val = td_poly_rings;
1511  tdd_poly_rings.is_null = false;
1512  import_buffers[col_idx++]->add_value(
1513  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1514  }
1515 
1516  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1517  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1518  std::vector<TDatum> td_bounds_data;
1519  for (auto b : bounds) {
1520  TDatum td_double;
1521  td_double.val.real_val = b;
1522  td_bounds_data.push_back(td_double);
1523  }
1524  TDatum tdd_bounds;
1525  tdd_bounds.val.arr_val = td_bounds_data;
1526  tdd_bounds.is_null = false;
1527  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1528  }
1529 
1530  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1531  // Create render_group value and add it to the physical column
1532  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1533  TDatum td_render_group;
1534  td_render_group.val.int_val = render_group;
1535  td_render_group.is_null = false;
1536  import_buffers[col_idx++]->add_value(
1537  cd_render_group, td_render_group, false, replicate_count);
1538  }
1539 }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1422
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_geo_physical_import_buffer_columnar()

void Importer_NS::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1541 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Importer_NS::compress_coords(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), kLINESTRING, kMULTIPOLYGON, kPOLYGON, and ColumnDescriptor::tableId.

Referenced by MapDHandler::load_table_binary_columnar().

1551  {
1552  const auto col_ti = cd->columnType;
1553  const auto col_type = col_ti.get_type();
1554  auto columnId = cd->columnId;
1555 
1556  auto coords_row_count = coords_column.size();
1557  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1558  for (auto coords : coords_column) {
1559  std::vector<TDatum> td_coords_data;
1560  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1561  for (auto cc : compressed_coords) {
1562  TDatum td_byte;
1563  td_byte.val.int_val = cc;
1564  td_coords_data.push_back(td_byte);
1565  }
1566  TDatum tdd_coords;
1567  tdd_coords.val.arr_val = td_coords_data;
1568  tdd_coords.is_null = false;
1569  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1570  }
1571  col_idx++;
1572 
1573  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1574  if (ring_sizes_column.size() != coords_row_count) {
1575  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1576  }
1577  // Create ring_sizes array value and add it to the physical column
1578  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1579  for (auto ring_sizes : ring_sizes_column) {
1580  std::vector<TDatum> td_ring_sizes;
1581  for (auto ring_size : ring_sizes) {
1582  TDatum td_ring_size;
1583  td_ring_size.val.int_val = ring_size;
1584  td_ring_sizes.push_back(td_ring_size);
1585  }
1586  TDatum tdd_ring_sizes;
1587  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1588  tdd_ring_sizes.is_null = false;
1589  import_buffers[col_idx]->add_value(
1590  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1591  }
1592  col_idx++;
1593  }
1594 
1595  if (col_type == kMULTIPOLYGON) {
1596  if (poly_rings_column.size() != coords_row_count) {
1597  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1598  }
1599  // Create poly_rings array value and add it to the physical column
1600  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1601  for (auto poly_rings : poly_rings_column) {
1602  std::vector<TDatum> td_poly_rings;
1603  for (auto num_rings : poly_rings) {
1604  TDatum td_num_rings;
1605  td_num_rings.val.int_val = num_rings;
1606  td_poly_rings.push_back(td_num_rings);
1607  }
1608  TDatum tdd_poly_rings;
1609  tdd_poly_rings.val.arr_val = td_poly_rings;
1610  tdd_poly_rings.is_null = false;
1611  import_buffers[col_idx]->add_value(
1612  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1613  }
1614  col_idx++;
1615  }
1616 
1617  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1618  if (bounds_column.size() != coords_row_count) {
1619  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1620  }
1621  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1622  for (auto bounds : bounds_column) {
1623  std::vector<TDatum> td_bounds_data;
1624  for (auto b : bounds) {
1625  TDatum td_double;
1626  td_double.val.real_val = b;
1627  td_bounds_data.push_back(td_double);
1628  }
1629  TDatum tdd_bounds;
1630  tdd_bounds.val.arr_val = td_bounds_data;
1631  tdd_bounds.is_null = false;
1632  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1633  }
1634  col_idx++;
1635  }
1636 
1637  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1638  // Create render_group value and add it to the physical column
1639  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1640  TDatum td_render_group;
1641  td_render_group.val.int_val = render_group;
1642  td_render_group.is_null = false;
1643  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1644  import_buffers[col_idx]->add_value(
1645  cd_render_group, td_render_group, false, replicate_count);
1646  }
1647  col_idx++;
1648  }
1649 }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1422
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:193
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ set_import_status()

void Importer_NS::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 213 of file Importer.cpp.

References Importer_NS::ImportStatus::elapsed, Importer_NS::ImportStatus::end, import_id, and Importer_NS::ImportStatus::start.

Referenced by importDelimited(), and importGDAL().

213  {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
218 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::string import_id
Definition: Importer.h:828
+ Here is the caller graph for this function:

◆ setGDALAuthorizationTokens()

void Importer_NS::Importer::setGDALAuthorizationTokens ( const CopyParams copy_params)
staticprivate

Definition at line 4091 of file Importer.cpp.

References logger::INFO, LOG, Importer_NS::CopyParams::s3_access_key, Importer_NS::CopyParams::s3_endpoint, Importer_NS::CopyParams::s3_region, and Importer_NS::CopyParams::s3_secret_key.

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4091  {
4092  // for now we only support S3
4093  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4094  // only set if non-empty to allow GDAL defaults to persist
4095  // explicitly clear if empty to revert to default and not reuse a previous session's
4096  // keys
4097  if (copy_params.s3_region.size()) {
4098 #if DEBUG_AWS_AUTHENTICATION
4099  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4100 #endif
4101  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4102  } else {
4103 #if DEBUG_AWS_AUTHENTICATION
4104  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4105 #endif
4106  CPLSetConfigOption("AWS_REGION", nullptr);
4107  }
4108  if (copy_params.s3_endpoint.size()) {
4109 #if DEBUG_AWS_AUTHENTICATION
4110  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4111 #endif
4112  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4113  } else {
4114 #if DEBUG_AWS_AUTHENTICATION
4115  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4116 #endif
4117  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4118  }
4119  if (copy_params.s3_access_key.size()) {
4120 #if DEBUG_AWS_AUTHENTICATION
4121  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4122  << "'";
4123 #endif
4124  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4125  } else {
4126 #if DEBUG_AWS_AUTHENTICATION
4127  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4128 #endif
4129  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4130  }
4131  if (copy_params.s3_secret_key.size()) {
4132 #if DEBUG_AWS_AUTHENTICATION
4133  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4134  << "'";
4135 #endif
4136  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4137  } else {
4138 #if DEBUG_AWS_AUTHENTICATION
4139  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4140 #endif
4141  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4142  }
4143 
4144 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4145  // if we haven't set keys, we need to disable signed access
4146  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4147 #if DEBUG_AWS_AUTHENTICATION
4148  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4149 #endif
4150  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4151  } else {
4152 #if DEBUG_AWS_AUTHENTICATION
4153  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4154 #endif
4155  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4156  }
4157 #endif
4158 }
#define LOG(tag)
Definition: Logger.h:188
std::string s3_access_key
Definition: CopyParams.h:62
std::string s3_endpoint
Definition: CopyParams.h:65
std::string s3_region
Definition: CopyParams.h:64
std::string s3_secret_key
Definition: CopyParams.h:63
+ Here is the caller graph for this function:

Member Data Documentation

◆ buffer

◆ file_size

size_t Importer_NS::Importer::file_size
private

Definition at line 829 of file Importer.h.

Referenced by importDelimited(), and Importer().

◆ import_buffers_vec

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > Importer_NS::Importer::import_buffers_vec
private

◆ import_id

std::string Importer_NS::Importer::import_id
private

Definition at line 828 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDAL(), and set_import_status().

◆ init_gdal_mutex

std::mutex Importer_NS::Importer::init_gdal_mutex
staticprivate

Definition at line 835 of file Importer.h.

Referenced by Importer_NS::GDALErrorHandler().

◆ is_array_a

std::unique_ptr<bool[]> Importer_NS::Importer::is_array_a
private

Definition at line 834 of file Importer.h.

Referenced by Importer().

◆ loader

std::unique_ptr<Loader> Importer_NS::Importer::loader
private

◆ max_threads

size_t Importer_NS::Importer::max_threads
private

The documentation for this class was generated from the following files: