OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer_NS::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Importer:
+ Collaboration diagram for Importer_NS::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import ()
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed) override
 
ImportStatus importGDAL (std::map< std::string, std::string > colname_to_src)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const int32_t start_epoch)
 
auto getLoader () const
 
- Public Member Functions inherited from Importer_NS::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths)
 

Static Public Member Functions

static bool hasGDALLibKML ()
 
static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static bool gdalSupportsNetworkFileAccess ()
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
 

Static Private Member Functions

static void initGDAL ()
 
static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static OGRDataSource * openGDALDataset (const std::string &fileName, const CopyParams &copy_params)
 
static void setGDALAuthorizationTokens (const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from Importer_NS::DataStreamSink
ImportStatus archivePlumber ()
 
- Protected Attributes inherited from Importer_NS::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status
 
bool load_failed = false
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 711 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 758 of file Importer.h.

758 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

Importer_NS::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 149 of file Importer.cpp.

153  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:149
Importer_NS::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 155 of file Importer.cpp.

References buffer, Importer_NS::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and Importer_NS::DataStreamSink::p_file.

156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
std::unique_ptr< Loader > loader
Definition: Importer.h:808
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:809
std::string import_id
Definition: Importer.h:803
const std::string file_path
Definition: Importer.h:629
Importer_NS::Importer::~Importer ( )
override

Definition at line 196 of file Importer.cpp.

References buffer, and Importer_NS::DataStreamSink::p_file.

196  {
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }

Member Function Documentation

void Importer_NS::Importer::checkpoint ( const int32_t  start_epoch)

Definition at line 3175 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), import_buffers_vec, logger::INFO, Importer_NS::DataStreamSink::load_failed, loader, and LOG.

Referenced by importDelimited(), and importGDAL().

3175  {
3176  if (load_failed) {
3177  // rollback to starting epoch - undo all the added records
3178  loader->setTableEpoch(start_epoch);
3179  } else {
3180  loader->checkpoint();
3181  }
3182 
3183  if (loader->getTableDesc()->persistenceLevel ==
3184  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3185  auto ms = measure<>::execution([&]() {
3186  if (!load_failed) {
3187  for (auto& p : import_buffers_vec[0]) {
3188  if (!p->stringDictCheckpoint()) {
3189  LOG(ERROR) << "Checkpointing Dictionary for Column "
3190  << p->getColumnDesc()->columnName << " failed.";
3191  load_failed = true;
3192  break;
3193  }
3194  }
3195  }
3196  });
3197  if (DEBUG_TIMING) {
3198  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3199  << std::endl;
3200  }
3201  }
3202 }
std::unique_ptr< Loader > loader
Definition: Importer.h:808
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:188
#define DEBUG_TIMING
Definition: Importer.cpp:138
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:807

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4521 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::check_geospatial_files(), MapDHandler::detect_column_types(), MapDHandler::get_all_files_in_archive(), MapDHandler::get_first_geo_file_in_archive(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4521  {
4522  return gdalStatInternal(path, copy_params, false);
4523 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4488

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4526 of file Importer.cpp.

References gdalStatInternal().

Referenced by MapDHandler::detect_column_types(), MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4527  {
4528  return gdalStatInternal(path, copy_params, true);
4529 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4488

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > Importer_NS::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 4598 of file Importer.cpp.

References Importer_NS::gdalGatherFilesInArchiveRecursive(), initGDAL(), and setGDALAuthorizationTokens().

Referenced by find_first_geo_file_in_archive(), and MapDHandler::get_all_files_in_archive().

4600  {
4601  // lazy init GDAL
4602  initGDAL();
4603 
4604  // set authorization tokens
4606 
4607  // prepare to gather files
4608  std::vector<std::string> files;
4609 
4610  // gather the files recursively
4611  gdalGatherFilesInArchiveRecursive(archive_path, files);
4612 
4613  // convert to relative paths inside archive
4614  for (auto& file : files) {
4615  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4616  }
4617 
4618  // done
4619  return files;
4620 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4146
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4531
static void initGDAL()
Definition: Importer.cpp:4099

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > Importer_NS::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 4623 of file Importer.cpp.

References CHECK(), EMPTY, GEO, initGDAL(), NON_GEO, openGDALDataset(), setGDALAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by MapDHandler::get_layers_in_geo_file(), and MapDHandler::import_geo_table().

4625  {
4626  // lazy init GDAL
4627  initGDAL();
4628 
4629  // set authorization tokens
4631 
4632  // prepare to gather layer info
4633  std::vector<GeoFileLayerInfo> layer_info;
4634 
4635  // open the data set
4637  if (poDS == nullptr) {
4638  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4639  file_name);
4640  }
4641 
4642  // enumerate the layers
4643  for (auto&& poLayer : poDS->GetLayers()) {
4645  // prepare to read this layer
4646  poLayer->ResetReading();
4647  // skip layer if empty
4648  if (poLayer->GetFeatureCount() > 0) {
4649  // get first feature
4650  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4651  CHECK(first_feature);
4652  // check feature for geometry
4653  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4654  if (!geometry) {
4655  // layer has no geometry
4656  contents = GeoFileLayerContents::NON_GEO;
4657  } else {
4658  // check the geometry type
4659  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4660  switch (wkbFlatten(geometry_type)) {
4661  case wkbPoint:
4662  case wkbLineString:
4663  case wkbPolygon:
4664  case wkbMultiPolygon:
4665  // layer has supported geo
4666  contents = GeoFileLayerContents::GEO;
4667  break;
4668  default:
4669  // layer has unsupported geometry
4671  break;
4672  }
4673  }
4674  }
4675  // store info for this layer
4676  layer_info.emplace_back(poLayer->GetName(), contents);
4677  }
4678 
4679  // done
4680  return layer_info;
4681 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4146
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
CHECK(cgen_state)
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4216
static void initGDAL()
Definition: Importer.cpp:4099

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 4488 of file Importer.cpp.

References initGDAL(), run_benchmark_import::result, and setGDALAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

4490  {
4491  // lazy init GDAL
4492  initGDAL();
4493 
4494  // set authorization tokens
4496 
4497 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4498  // clear GDAL stat cache
4499  // without this, file existence will be cached, even if authentication changes
4500  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4501  VSICurlClearCache();
4502 #endif
4503 
4504  // stat path
4505  VSIStatBufL sb;
4506  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4507  if (result < 0) {
4508  return false;
4509  }
4510 
4511  // exists?
4512  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4513  return true;
4514  } else if (VSI_ISREG(sb.st_mode)) {
4515  return true;
4516  }
4517  return false;
4518 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4146
static void initGDAL()
Definition: Importer.cpp:4099

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Importer::gdalSupportsNetworkFileAccess ( )
static

Definition at line 4684 of file Importer.cpp.

Referenced by add_vsi_network_prefix().

4684  {
4685 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4686  return true;
4687 #else
4688  return false;
4689 #endif
4690 }

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > Importer_NS::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4398 of file Importer.cpp.

References CHECK(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Importer_NS::CopyParams::geo_coords_comp_param, Importer_NS::CopyParams::geo_coords_encoding, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_coords_type, Importer_NS::CopyParams::geo_explode_collections, Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, Importer_NS::ogr_to_type(), openGDALDataset(), Importer_NS::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by MapDHandler::detect_column_types().

4401  {
4402  std::list<ColumnDescriptor> cds;
4403 
4405  if (poDS == nullptr) {
4406  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4407  file_name);
4408  }
4409 
4410  OGRLayer& layer =
4412 
4413  layer.ResetReading();
4414  // TODO(andrewseidl): support multiple features
4415  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4416  if (poFeature == nullptr) {
4417  throw std::runtime_error("No features found in " + file_name);
4418  }
4419  // get fields as regular columns
4420  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4421  CHECK(poFDefn);
4422  int iField;
4423  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4424  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4425  auto typePair = ogr_to_type(poFieldDefn->GetType());
4426  ColumnDescriptor cd;
4427  cd.columnName = poFieldDefn->GetNameRef();
4428  cd.sourceName = poFieldDefn->GetNameRef();
4429  SQLTypeInfo ti;
4430  if (typePair.second) {
4431  ti.set_type(kARRAY);
4432  ti.set_subtype(typePair.first);
4433  } else {
4434  ti.set_type(typePair.first);
4435  }
4436  if (typePair.first == kTEXT) {
4438  ti.set_comp_param(32);
4439  }
4440  ti.set_fixed_size();
4441  cd.columnType = ti;
4442  cds.push_back(cd);
4443  }
4444  // get geo column, if any
4445  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4446  if (poGeometry) {
4447  ColumnDescriptor cd;
4448  cd.columnName = geo_column_name;
4449  cd.sourceName = geo_column_name;
4450 
4451  // get GDAL type
4452  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4453 
4454  // if exploding, override any collection type to child type
4456  if (ogr_type == wkbMultiPolygon) {
4457  ogr_type = wkbPolygon;
4458  } else if (ogr_type == wkbMultiLineString) {
4459  ogr_type = wkbLineString;
4460  } else if (ogr_type == wkbMultiPoint) {
4461  ogr_type = wkbPoint;
4462  }
4463  }
4464 
4465  // convert to internal type
4466  SQLTypes geoType = ogr_to_type(ogr_type);
4467 
4468  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4470  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4471  }
4472 
4473  // build full internal type
4474  SQLTypeInfo ti;
4475  ti.set_type(geoType);
4481  cd.columnType = ti;
4482 
4483  cds.push_back(cd);
4484  }
4485  return cds;
4486 }
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4343
void set_compression(EncodingType c)
Definition: sqltypes.h:348
SQLTypes
Definition: sqltypes.h:39
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:339
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
std::string sourceName
EncodingType geo_coords_encoding
Definition: CopyParams.h:73
void set_input_srid(int d)
Definition: sqltypes.h:342
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4249
CHECK(cgen_state)
void set_fixed_size()
Definition: sqltypes.h:347
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:344
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4216
void set_comp_param(int p)
Definition: sqltypes.h:349
Definition: sqltypes.h:53
std::string geo_layer_name
Definition: CopyParams.h:78
SQLTypeInfo columnType
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:144
std::string columnName
int32_t geo_coords_comp_param
Definition: CopyParams.h:74
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:338

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& Importer_NS::Importer::get_column_descs ( ) const
inline

Definition at line 725 of file Importer.h.

References loader.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

725  {
726  return loader->get_column_descs();
727  }
std::unique_ptr< Loader > loader
Definition: Importer.h:808

+ Here is the caller graph for this function:

const CopyParams& Importer_NS::Importer::get_copy_params ( ) const
inline

Definition at line 724 of file Importer.h.

References Importer_NS::DataStreamSink::copy_params.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

724 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& Importer_NS::Importer::get_import_buffers ( int  i)
inline

Definition at line 733 of file Importer.h.

References import_buffers_vec.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

733  {
734  return import_buffers_vec[i];
735  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:807

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& Importer_NS::Importer::get_import_buffers_vec ( )
inline

Definition at line 730 of file Importer.h.

References import_buffers_vec.

730  {
731  return import_buffers_vec;
732  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:807
ImportStatus Importer_NS::Importer::get_import_status ( const std::string &  id)
static

Definition at line 208 of file Importer.cpp.

References Importer_NS::import_status_map, and Importer_NS::status_mutex.

Referenced by MapDHandler::import_table_status().

208  {
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::string import_id
Definition: Importer.h:803

+ Here is the caller graph for this function:

const bool* Importer_NS::Importer::get_is_array ( ) const
inline

Definition at line 736 of file Importer.h.

References is_array_a.

Referenced by Importer_NS::import_thread_delimited().

736 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:809

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& Importer_NS::Importer::getCatalog ( )
inline

Definition at line 769 of file Importer.h.

References loader.

Referenced by Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and Importer_NS::import_thread_delimited().

769 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:808

+ Here is the caller graph for this function:

auto Importer_NS::Importer::getLoader ( ) const
inline

Definition at line 793 of file Importer.h.

References loader.

793 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:808
bool Importer_NS::Importer::hasGDALLibKML ( )
static

Definition at line 4141 of file Importer.cpp.

4141  {
4142  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4143 }
ImportStatus Importer_NS::Importer::import ( )

Definition at line 3847 of file Importer.cpp.

References Importer_NS::DataStreamSink::archivePlumber().

3847  {
3849 }
ImportStatus archivePlumber()
Definition: Importer.cpp:3204

+ Here is the call graph for this function:

ImportStatus Importer_NS::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed 
)
overridevirtual

Implements Importer_NS::DataStreamSink.

Definition at line 3851 of file Importer.cpp.

References Importer_NS::CopyParams::buffer_size, CHECK(), checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, Importer_NS::DataStreamSink::copy_params, logger::ERROR, Importer_NS::DataStreamSink::file_offsets, Importer_NS::DataStreamSink::file_offsets_mutex, file_size, Importer_NS::DelimitedParserUtils::find_end(), Importer_NS::CopyParams::geo_assign_render_groups, SQLTypeInfo::get_type(), import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_delimited(), kMULTIPOLYGON, kPOLYGON, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, max_threads, Importer_NS::DataStreamSink::p_file, Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, Importer_NS::ImportStatus::rows_rejected, set_import_status(), Importer_NS::status_mutex, logger::thread_id(), Importer_NS::CopyParams::threads, Importer_NS::DataStreamSink::total_file_size, and VLOG.

3852  {
3853  bool load_truncated = false;
3855 
3856  if (!p_file) {
3857  p_file = fopen(file_path.c_str(), "rb");
3858  }
3859  if (!p_file) {
3860  throw std::runtime_error("failed to open file '" + file_path +
3861  "': " + strerror(errno));
3862  }
3863 
3864  if (!decompressed) {
3865  (void)fseek(p_file, 0, SEEK_END);
3866  file_size = ftell(p_file);
3867  }
3868 
3869  if (copy_params.threads == 0) {
3870  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3871  } else {
3872  max_threads = static_cast<size_t>(copy_params.threads);
3873  }
3874 
3875  // deal with small files
3876  size_t alloc_size = copy_params.buffer_size;
3877  if (!decompressed && file_size < alloc_size) {
3878  alloc_size = file_size;
3879  }
3880 
3881  for (size_t i = 0; i < max_threads; i++) {
3882  import_buffers_vec.emplace_back();
3883  for (const auto cd : loader->get_column_descs()) {
3884  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3885  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3886  }
3887  }
3888 
3889  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3890  size_t current_pos = 0;
3891  size_t end_pos;
3892  size_t begin_pos = 0;
3893 
3894  (void)fseek(p_file, current_pos, SEEK_SET);
3895  size_t size =
3896  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3897 
3898  // make render group analyzers for each poly column
3899  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3901  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3902  loader->getTableDesc()->tableId, false, false, false);
3903  for (auto cd : columnDescriptors) {
3904  SQLTypes ct = cd->columnType.get_type();
3905  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3906  auto rga = std::make_shared<RenderGroupAnalyzer>();
3907  rga->seedFromExistingTableContents(loader, cd->columnName);
3908  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3909  }
3910  }
3911  }
3912 
3913  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3914  loader->getTableDesc()->tableId};
3915  auto start_epoch = loader->getTableEpoch();
3916  {
3917  std::list<std::future<ImportStatus>> threads;
3918 
3919  // use a stack to track thread_ids which must not overlap among threads
3920  // because thread_id is used to index import_buffers_vec[]
3921  std::stack<size_t> stack_thread_ids;
3922  for (size_t i = 0; i < max_threads; i++) {
3923  stack_thread_ids.push(i);
3924  }
3925  // added for true row index on error
3926  size_t first_row_index_this_buffer = 0;
3927 
3928  while (size > 0) {
3929  unsigned int num_rows_this_buffer = 0;
3930  CHECK(scratch_buffer);
3932  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
3933 
3934  // unput residual
3935  int nresidual = size - end_pos;
3936  std::unique_ptr<char[]> unbuf;
3937  if (nresidual > 0) {
3938  unbuf = std::make_unique<char[]>(nresidual);
3939  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3940  }
3941 
3942  // get a thread_id not in use
3943  auto thread_id = stack_thread_ids.top();
3944  stack_thread_ids.pop();
3945  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3946 
3947  threads.push_back(std::async(std::launch::async,
3949  thread_id,
3950  this,
3951  std::move(scratch_buffer),
3952  begin_pos,
3953  end_pos,
3954  end_pos,
3955  columnIdToRenderGroupAnalyzerMap,
3956  first_row_index_this_buffer));
3957 
3958  first_row_index_this_buffer += num_rows_this_buffer;
3959 
3960  current_pos += end_pos;
3961  scratch_buffer = std::make_unique<char[]>(alloc_size);
3962  CHECK(scratch_buffer);
3963  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3964  size = nresidual + fread(scratch_buffer.get() + nresidual,
3965  1,
3966  copy_params.buffer_size - nresidual,
3967  p_file);
3968 
3969  begin_pos = 0;
3970 
3971  while (threads.size() > 0) {
3972  int nready = 0;
3973  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3974  it != threads.end();) {
3975  auto& p = *it;
3976  std::chrono::milliseconds span(
3977  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3978  if (p.wait_for(span) == std::future_status::ready) {
3979  auto ret_import_status = p.get();
3980  import_status += ret_import_status;
3981  // sum up current total file offsets
3982  size_t total_file_offset{0};
3983  if (decompressed) {
3984  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3985  for (const auto file_offset : file_offsets) {
3986  total_file_offset += file_offset;
3987  }
3988  }
3989  // estimate number of rows per current total file offset
3990  if (decompressed ? total_file_offset : current_pos) {
3992  (decompressed ? (float)total_file_size / total_file_offset
3993  : (float)file_size / current_pos) *
3994  import_status.rows_completed;
3995  }
3996  VLOG(3) << "rows_completed " << import_status.rows_completed
3997  << ", rows_estimated " << import_status.rows_estimated
3998  << ", total_file_size " << total_file_size << ", total_file_offset "
3999  << total_file_offset;
4001  // recall thread_id for reuse
4002  stack_thread_ids.push(ret_import_status.thread_id);
4003  threads.erase(it++);
4004  ++nready;
4005  } else {
4006  ++it;
4007  }
4008  }
4009 
4010  if (nready == 0) {
4011  std::this_thread::yield();
4012  }
4013 
4014  // on eof, wait all threads to finish
4015  if (0 == size) {
4016  continue;
4017  }
4018 
4019  // keep reading if any free thread slot
4020  // this is one of the major difference from old threading model !!
4021  if (threads.size() < max_threads) {
4022  break;
4023  }
4024  }
4025 
4026  if (import_status.rows_rejected > copy_params.max_reject) {
4027  load_truncated = true;
4028  load_failed = true;
4029  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4030  break;
4031  }
4032  if (load_failed) {
4033  load_truncated = true;
4034  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
4035  "more details";
4036  break;
4037  }
4038  }
4039 
4040  // join dangling threads in case of LOG(ERROR) above
4041  for (auto& p : threads) {
4042  p.wait();
4043  }
4044  }
4045 
4046  checkpoint(start_epoch);
4047 
4048  // must set import_status.load_truncated before closing this end of pipe
4049  // otherwise, the thread on the other end would throw an unwanted 'write()'
4050  // exception
4051  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
4052  import_status.load_truncated = load_truncated;
4053 
4054  fclose(p_file);
4055  p_file = nullptr;
4056  return import_status;
4057 }
std::unique_ptr< Loader > loader
Definition: Importer.h:808
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< size_t > file_offsets
Definition: Importer.h:634
SQLTypes
Definition: sqltypes.h:39
#define LOG(tag)
Definition: Logger.h:188
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:807
CHECK(cgen_state)
std::string import_id
Definition: Importer.h:803
std::mutex file_offsets_mutex
Definition: Importer.h:635
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3175
ImportStatus import_status
Definition: Importer.h:631
ThreadId thread_id()
Definition: Logger.cpp:715
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1794
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
#define VLOG(n)
Definition: Logger.h:291
const std::string file_path
Definition: Importer.h:629

+ Here is the call graph for this function:

ImportStatus Importer_NS::Importer::importGDAL ( std::map< std::string, std::string >  colname_to_src)

Definition at line 4692 of file Importer.cpp.

References CHECK(), CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, Importer_NS::DataStreamSink::copy_params, logger::ERROR, Importer_NS::CopyParams::geo_assign_render_groups, Importer_NS::CopyParams::geo_coords_srid, Importer_NS::CopyParams::geo_layer_name, SQLTypeInfo::get_type(), Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, Importer_NS::DataStreamSink::import_status, Importer_NS::import_thread_shapefile(), kMULTIPOLYGON, kPOLYGON, Importer_NS::DataStreamSink::load_failed, Importer_NS::ImportStatus::load_truncated, loader, LOG, Importer_NS::CopyParams::max_reject, max_threads, openGDALDataset(), Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, set_import_status(), logger::thread_id(), and Importer_NS::CopyParams::threads.

Referenced by QueryRunner::ImportDriver::importGeoTable().

4693  {
4694  // initial status
4695  bool load_truncated = false;
4697 
4699  if (poDS == nullptr) {
4700  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4701  file_path);
4702  }
4703 
4704  OGRLayer& layer =
4706 
4707  // get the number of features in this layer
4708  size_t numFeatures = layer.GetFeatureCount();
4709 
4710  // build map of metadata field (additional columns) name to index
4711  // use shared_ptr since we need to pass it to the worker
4712  FieldNameToIndexMapType fieldNameToIndexMap;
4713  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4714  CHECK(poFDefn);
4715  size_t numFields = poFDefn->GetFieldCount();
4716  for (size_t iField = 0; iField < numFields; iField++) {
4717  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4718  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4719  }
4720 
4721  // the geographic spatial reference we want to put everything in
4722  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4723  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4724 
4725 #if GDAL_VERSION_MAJOR >= 3
4726  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4727  // this results in X and Y being transposed for angle-based
4728  // coordinate systems. This restores the previous behavior.
4729  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4730 #endif
4731 
4732 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4733  // just one "thread"
4734  size_t max_threads = 1;
4735 #else
4736  // how many threads to use
4737  size_t max_threads = 0;
4738  if (copy_params.threads == 0) {
4739  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4740  } else {
4741  max_threads = copy_params.threads;
4742  }
4743 #endif
4744 
4745  // make an import buffer for each thread
4746  CHECK_EQ(import_buffers_vec.size(), 0u);
4747  import_buffers_vec.resize(max_threads);
4748  for (size_t i = 0; i < max_threads; i++) {
4749  for (const auto cd : loader->get_column_descs()) {
4750  import_buffers_vec[i].emplace_back(
4751  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4752  }
4753  }
4754 
4755  // make render group analyzers for each poly column
4756  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4758  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4759  loader->getTableDesc()->tableId, false, false, false);
4760  for (auto cd : columnDescriptors) {
4761  SQLTypes ct = cd->columnType.get_type();
4762  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4763  auto rga = std::make_shared<RenderGroupAnalyzer>();
4764  rga->seedFromExistingTableContents(loader, cd->columnName);
4765  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4766  }
4767  }
4768  }
4769 
4770 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4771  // threads
4772  std::list<std::future<ImportStatus>> threads;
4773 
4774  // use a stack to track thread_ids which must not overlap among threads
4775  // because thread_id is used to index import_buffers_vec[]
4776  std::stack<size_t> stack_thread_ids;
4777  for (size_t i = 0; i < max_threads; i++) {
4778  stack_thread_ids.push(i);
4779  }
4780 #endif
4781 
4782  // checkpoint the table
4783  auto start_epoch = loader->getTableEpoch();
4784 
4785  // reset the layer
4786  layer.ResetReading();
4787 
4788  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4789 
4790  // make a features buffer for each thread
4791  std::vector<FeaturePtrVector> features(max_threads);
4792 
4793  // for each feature...
4794  size_t firstFeatureThisChunk = 0;
4795  while (firstFeatureThisChunk < numFeatures) {
4796  // how many features this chunk
4797  size_t numFeaturesThisChunk =
4798  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4799 
4800 // get a thread_id not in use
4801 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4802  size_t thread_id = 0;
4803 #else
4804  auto thread_id = stack_thread_ids.top();
4805  stack_thread_ids.pop();
4806  CHECK(thread_id < max_threads);
4807 #endif
4808 
4809  // fill features buffer for new thread
4810  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4811  features[thread_id].emplace_back(layer.GetNextFeature());
4812  }
4813 
4814 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4815  // call worker function directly
4816  auto ret_import_status = import_thread_shapefile(0,
4817  this,
4818  poGeographicSR.get(),
4819  std::move(features[thread_id]),
4820  firstFeatureThisChunk,
4821  numFeaturesThisChunk,
4822  fieldNameToIndexMap,
4823  columnNameToSourceNameMap,
4824  columnIdToRenderGroupAnalyzerMap);
4825  import_status += ret_import_status;
4826  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4827  import_status.rows_completed;
4828  set_import_status(import_id, import_status);
4829 #else
4830  // fire up that thread to import this geometry
4831  threads.push_back(std::async(std::launch::async,
4833  thread_id,
4834  this,
4835  poGeographicSR.get(),
4836  std::move(features[thread_id]),
4837  firstFeatureThisChunk,
4838  numFeaturesThisChunk,
4839  fieldNameToIndexMap,
4840  columnNameToSourceNameMap,
4841  columnIdToRenderGroupAnalyzerMap));
4842 
4843  // let the threads run
4844  while (threads.size() > 0) {
4845  int nready = 0;
4846  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4847  it != threads.end();) {
4848  auto& p = *it;
4849  std::chrono::milliseconds span(
4850  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4851  if (p.wait_for(span) == std::future_status::ready) {
4852  auto ret_import_status = p.get();
4853  import_status += ret_import_status;
4854  import_status.rows_estimated =
4855  ((float)firstFeatureThisChunk / (float)numFeatures) *
4856  import_status.rows_completed;
4857  set_import_status(import_id, import_status);
4858 
4859  // recall thread_id for reuse
4860  stack_thread_ids.push(ret_import_status.thread_id);
4861 
4862  threads.erase(it++);
4863  ++nready;
4864  } else {
4865  ++it;
4866  }
4867  }
4868 
4869  if (nready == 0) {
4870  std::this_thread::yield();
4871  }
4872 
4873  // keep reading if any free thread slot
4874  // this is one of the major difference from old threading model !!
4875  if (threads.size() < max_threads) {
4876  break;
4877  }
4878  }
4879 #endif
4880 
4881  // out of rows?
4882  if (import_status.rows_rejected > copy_params.max_reject) {
4883  load_truncated = true;
4884  load_failed = true;
4885  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4886  break;
4887  }
4888 
4889  // failed?
4890  if (load_failed) {
4891  load_truncated = true;
4892  LOG(ERROR)
4893  << "A call to the Loader::load failed, Please review the logs for more details";
4894  break;
4895  }
4896 
4897  firstFeatureThisChunk += numFeaturesThisChunk;
4898  }
4899 
4900 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4901  // wait for any remaining threads
4902  if (threads.size()) {
4903  for (auto& p : threads) {
4904  // wait for the thread
4905  p.wait();
4906  // get the result and update the final import status
4907  auto ret_import_status = p.get();
4908  import_status += ret_import_status;
4909  import_status.rows_estimated = import_status.rows_completed;
4910  set_import_status(import_id, import_status);
4911  }
4912  }
4913 #endif
4914 
4915  checkpoint(start_epoch);
4916 
4917  // must set import_status.load_truncated before closing this end of pipe
4918  // otherwise, the thread on the other end would throw an unwanted 'write()'
4919  // exception
4920  import_status.load_truncated = load_truncated;
4921  return import_status;
4922 }
std::unique_ptr< Loader > loader
Definition: Importer.h:808
#define CHECK_EQ(x, y)
Definition: Logger.h:205
SQLTypes
Definition: sqltypes.h:39
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
#define LOG(tag)
Definition: Logger.h:188
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4249
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:807
CHECK(cgen_state)
std::string import_id
Definition: Importer.h:803
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2118
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4216
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3175
std::string geo_layer_name
Definition: CopyParams.h:78
ImportStatus import_status
Definition: Importer.h:631
ThreadId thread_id()
Definition: Logger.cpp:715
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:132
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
const std::string file_path
Definition: Importer.h:629

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::initGDAL ( )
staticprivate

Definition at line 4099 of file Importer.cpp.

References Importer_NS::GDALErrorHandler(), logger::INFO, LOG, and mapd_root_abs_path().

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4099  {
4100  // this should not be called from multiple threads, but...
4101  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
4102  // init under mutex
4103  static bool gdal_initialized = false;
4104  if (!gdal_initialized) {
4105  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
4106  setenv("GDAL_DATA",
4107  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
4108  true);
4109 
4110  // configure SSL certificate path (per S3Archive::init_for_read)
4111  // in a production build, GDAL and Curl will have been built on
4112  // CentOS, so the baked-in system path will be wrong for Ubuntu
4113  // and other Linux distros. Unless the user is deliberately
4114  // overriding it by setting SSL_CERT_FILE explicitly in the server
4115  // environment, we set it to whichever CA bundle directory exists
4116  // on the machine we're running on
4117  std::list<std::string> v_known_ca_paths({
4118  "/etc/ssl/certs/ca-certificates.crt",
4119  "/etc/pki/tls/certs/ca-bundle.crt",
4120  "/usr/share/ssl/certs/ca-bundle.crt",
4121  "/usr/local/share/certs/ca-root.crt",
4122  "/etc/ssl/cert.pem",
4123  "/etc/ssl/ca-bundle.pem",
4124  });
4125  for (const auto& known_ca_path : v_known_ca_paths) {
4126  if (boost::filesystem::exists(known_ca_path)) {
4127  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4128  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4129  break;
4130  }
4131  }
4132 
4133  GDALAllRegister();
4134  OGRRegisterAll();
4135  CPLSetErrorHandler(*GDALErrorHandler);
4136  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4137  gdal_initialized = true;
4138  }
4139 }
#define LOG(tag)
Definition: Logger.h:188
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
static std::mutex init_gdal_mutex
Definition: Importer.h:810
void GDALErrorHandler(CPLErr eErrClass, int err_no, const char *msg)
Definition: Importer.cpp:4076

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count 
)

Definition at line 3168 of file Importer.cpp.

References Importer_NS::DataStreamSink::load_failed, and loader.

Referenced by Importer_NS::import_thread_delimited(), and Importer_NS::import_thread_shapefile().

3169  {
3170  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3171  load_failed = true;
3172  }
3173 }
std::unique_ptr< Loader > loader
Definition: Importer.h:808

+ Here is the caller graph for this function:

OGRDataSource * Importer_NS::Importer::openGDALDataset ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4216 of file Importer.cpp.

References logger::ERROR, logger::INFO, initGDAL(), LOG, and setGDALAuthorizationTokens().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptors(), importGDAL(), and readMetadataSampleGDAL().

4217  {
4218  // lazy init GDAL
4219  initGDAL();
4220 
4221  // set authorization tokens
4223 
4224  // open the file
4225  OGRDataSource* poDS;
4226 #if GDAL_VERSION_MAJOR == 1
4227  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4228 #else
4229  poDS = (OGRDataSource*)GDALOpenEx(
4230  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4231  if (poDS == nullptr) {
4232  poDS = (OGRDataSource*)GDALOpenEx(
4233  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4234  if (poDS) {
4235  LOG(INFO) << "openGDALDataset had to open as read-only";
4236  }
4237  }
4238 #endif
4239  if (poDS == nullptr) {
4240  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4241  }
4242  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4243  // in a memory leak if GDAL successfully opened the input dataset.
4244  return poDS;
4245 }
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4146
#define LOG(tag)
Definition: Logger.h:188
static void initGDAL()
Definition: Importer.cpp:4099

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4272 of file Importer.cpp.

References CHECK(), Importer_NS::CopyParams::geo_layer_name, Importer_NS::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), and openGDALDataset().

Referenced by MapDHandler::detect_column_types().

4277  {
4279  if (poDS == nullptr) {
4280  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4281  file_name);
4282  }
4283 
4284  OGRLayer& layer =
4286 
4287  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4288  CHECK(poFDefn);
4289 
4290  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4291  auto nFeats = layer.GetFeatureCount();
4292  size_t numFeatures =
4293  std::max(static_cast<decltype(nFeats)>(0),
4294  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4295  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4296  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4297  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4298  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4299  }
4300  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4301  layer.ResetReading();
4302  size_t iFeature = 0;
4303  while (iFeature < numFeatures) {
4304  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4305  if (!poFeature) {
4306  break;
4307  }
4308 
4309  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4310  if (poGeometry != nullptr) {
4311  // validate geom type (again?)
4312  switch (wkbFlatten(poGeometry->getGeometryType())) {
4313  case wkbPoint:
4314  case wkbLineString:
4315  case wkbPolygon:
4316  case wkbMultiPolygon:
4317  break;
4318  default:
4319  throw std::runtime_error("Unsupported geometry type: " +
4320  std::string(poGeometry->getGeometryName()));
4321  }
4322 
4323  // populate metadata for regular fields
4324  for (auto i : metadata) {
4325  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4326  if (iField >= 0) { // geom is -1
4327  metadata[i.first].at(iFeature) =
4328  std::string(poFeature->GetFieldAsString(iField));
4329  }
4330  }
4331 
4332  // populate metadata for geo column with WKT string
4333  char* wkts = nullptr;
4334  poGeometry->exportToWkt(&wkts);
4335  CHECK(wkts);
4336  metadata[geo_column_name].at(iFeature) = wkts;
4337  CPLFree(wkts);
4338  }
4339  iFeature++;
4340  }
4341 }
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4249
CHECK(cgen_state)
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4216
std::string geo_layer_name
Definition: CopyParams.h:78

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1411 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), anonymous_namespace{ResultSetGeoSerialization.h}::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by Importer_NS::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), Importer_NS::import_thread_delimited(), and MapDHandler::load_table().

1421  {
1422  const auto col_ti = cd->columnType;
1423  const auto col_type = col_ti.get_type();
1424  auto columnId = cd->columnId;
1425  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1426  bool is_null_geo = false;
1427  bool is_null_point = false;
1428  if (!col_ti.get_notnull()) {
1429  // Check for NULL geo
1430  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1431  is_null_point = true;
1432  coords.clear();
1433  }
1434  is_null_geo = coords.empty();
1435  if (is_null_point) {
1436  coords.push_back(NULL_ARRAY_DOUBLE);
1437  coords.push_back(NULL_DOUBLE);
1438  // Treating POINT coords as notnull, need to store actual encoding
1439  // [un]compressed+[not]null
1440  is_null_geo = false;
1441  }
1442  }
1443  std::vector<TDatum> td_coords_data;
1444  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1445  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1446  // in a fixlen array, compressed and uncompressed.
1447  if (!is_null_geo) {
1448  std::vector<uint8_t> compressed_coords = geospatial::compress_coords(coords, col_ti);
1449  for (auto cc : compressed_coords) {
1450  TDatum td_byte;
1451  td_byte.val.int_val = cc;
1452  td_coords_data.push_back(td_byte);
1453  }
1454  }
1455  TDatum tdd_coords;
1456  tdd_coords.val.arr_val = td_coords_data;
1457  tdd_coords.is_null = is_null_geo;
1458  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1459 
1460  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1461  // Create ring_sizes array value and add it to the physical column
1462  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1463  std::vector<TDatum> td_ring_sizes;
1464  if (!is_null_geo) {
1465  for (auto ring_size : ring_sizes) {
1466  TDatum td_ring_size;
1467  td_ring_size.val.int_val = ring_size;
1468  td_ring_sizes.push_back(td_ring_size);
1469  }
1470  }
1471  TDatum tdd_ring_sizes;
1472  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1473  tdd_ring_sizes.is_null = is_null_geo;
1474  import_buffers[col_idx++]->add_value(
1475  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1476  }
1477 
1478  if (col_type == kMULTIPOLYGON) {
1479  // Create poly_rings array value and add it to the physical column
1480  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1481  std::vector<TDatum> td_poly_rings;
1482  if (!is_null_geo) {
1483  for (auto num_rings : poly_rings) {
1484  TDatum td_num_rings;
1485  td_num_rings.val.int_val = num_rings;
1486  td_poly_rings.push_back(td_num_rings);
1487  }
1488  }
1489  TDatum tdd_poly_rings;
1490  tdd_poly_rings.val.arr_val = td_poly_rings;
1491  tdd_poly_rings.is_null = is_null_geo;
1492  import_buffers[col_idx++]->add_value(
1493  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1494  }
1495 
1496  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1497  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1498  std::vector<TDatum> td_bounds_data;
1499  if (!is_null_geo) {
1500  for (auto b : bounds) {
1501  TDatum td_double;
1502  td_double.val.real_val = b;
1503  td_bounds_data.push_back(td_double);
1504  }
1505  }
1506  TDatum tdd_bounds;
1507  tdd_bounds.val.arr_val = td_bounds_data;
1508  tdd_bounds.is_null = is_null_geo;
1509  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1510  }
1511 
1512  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1513  // Create render_group value and add it to the physical column
1514  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1515  TDatum td_render_group;
1516  td_render_group.val.int_val = render_group;
1517  td_render_group.is_null = is_null_geo;
1518  import_buffers[col_idx++]->add_value(
1519  cd_render_group, td_render_group, false, replicate_count);
1520  }
1521 }
#define NULL_DOUBLE
Definition: sqltypes.h:175
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:183
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:248
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
int  render_group,
const int64_t  replicate_count = 0 
)
static

Definition at line 1523 of file Importer.cpp.

References CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnType, geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), anonymous_namespace{ResultSetGeoSerialization.h}::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by MapDHandler::load_table_binary_columnar().

1533  {
1534  const auto col_ti = cd->columnType;
1535  const auto col_type = col_ti.get_type();
1536  auto columnId = cd->columnId;
1537 
1538  auto coords_row_count = coords_column.size();
1539  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1540  for (auto coords : coords_column) {
1541  bool is_null_geo = false;
1542  bool is_null_point = false;
1543  if (!col_ti.get_notnull()) {
1544  // Check for NULL geo
1545  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1546  is_null_point = true;
1547  coords.clear();
1548  }
1549  is_null_geo = coords.empty();
1550  if (is_null_point) {
1551  coords.push_back(NULL_ARRAY_DOUBLE);
1552  coords.push_back(NULL_DOUBLE);
1553  // Treating POINT coords as notnull, need to store actual encoding
1554  // [un]compressed+[not]null
1555  is_null_geo = false;
1556  }
1557  }
1558  std::vector<TDatum> td_coords_data;
1559  if (!is_null_geo) {
1560  std::vector<uint8_t> compressed_coords =
1561  geospatial::compress_coords(coords, col_ti);
1562  for (auto cc : compressed_coords) {
1563  TDatum td_byte;
1564  td_byte.val.int_val = cc;
1565  td_coords_data.push_back(td_byte);
1566  }
1567  }
1568  TDatum tdd_coords;
1569  tdd_coords.val.arr_val = td_coords_data;
1570  tdd_coords.is_null = is_null_geo;
1571  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1572  }
1573  col_idx++;
1574 
1575  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1576  if (ring_sizes_column.size() != coords_row_count) {
1577  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1578  }
1579  // Create ring_sizes array value and add it to the physical column
1580  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1581  for (auto ring_sizes : ring_sizes_column) {
1582  bool is_null_geo = false;
1583  if (!col_ti.get_notnull()) {
1584  // Check for NULL geo
1585  is_null_geo = ring_sizes.empty();
1586  }
1587  std::vector<TDatum> td_ring_sizes;
1588  for (auto ring_size : ring_sizes) {
1589  TDatum td_ring_size;
1590  td_ring_size.val.int_val = ring_size;
1591  td_ring_sizes.push_back(td_ring_size);
1592  }
1593  TDatum tdd_ring_sizes;
1594  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1595  tdd_ring_sizes.is_null = is_null_geo;
1596  import_buffers[col_idx]->add_value(
1597  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1598  }
1599  col_idx++;
1600  }
1601 
1602  if (col_type == kMULTIPOLYGON) {
1603  if (poly_rings_column.size() != coords_row_count) {
1604  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1605  }
1606  // Create poly_rings array value and add it to the physical column
1607  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1608  for (auto poly_rings : poly_rings_column) {
1609  bool is_null_geo = false;
1610  if (!col_ti.get_notnull()) {
1611  // Check for NULL geo
1612  is_null_geo = poly_rings.empty();
1613  }
1614  std::vector<TDatum> td_poly_rings;
1615  for (auto num_rings : poly_rings) {
1616  TDatum td_num_rings;
1617  td_num_rings.val.int_val = num_rings;
1618  td_poly_rings.push_back(td_num_rings);
1619  }
1620  TDatum tdd_poly_rings;
1621  tdd_poly_rings.val.arr_val = td_poly_rings;
1622  tdd_poly_rings.is_null = is_null_geo;
1623  import_buffers[col_idx]->add_value(
1624  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1625  }
1626  col_idx++;
1627  }
1628 
1629  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1630  if (bounds_column.size() != coords_row_count) {
1631  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1632  }
1633  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1634  for (auto bounds : bounds_column) {
1635  bool is_null_geo = false;
1636  if (!col_ti.get_notnull()) {
1637  // Check for NULL geo
1638  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1639  }
1640  std::vector<TDatum> td_bounds_data;
1641  for (auto b : bounds) {
1642  TDatum td_double;
1643  td_double.val.real_val = b;
1644  td_bounds_data.push_back(td_double);
1645  }
1646  TDatum tdd_bounds;
1647  tdd_bounds.val.arr_val = td_bounds_data;
1648  tdd_bounds.is_null = is_null_geo;
1649  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1650  }
1651  col_idx++;
1652  }
1653 
1654  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1655  // Create render_group value and add it to the physical column
1656  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1657  TDatum td_render_group;
1658  td_render_group.val.int_val = render_group;
1659  td_render_group.is_null = false;
1660  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1661  import_buffers[col_idx]->add_value(
1662  cd_render_group, td_render_group, false, replicate_count);
1663  }
1664  col_idx++;
1665  }
1666 }
#define NULL_DOUBLE
Definition: sqltypes.h:175
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:183
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:248
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 213 of file Importer.cpp.

References Importer_NS::ImportStatus::elapsed, Importer_NS::ImportStatus::end, import_id, Importer_NS::import_status_map, Importer_NS::ImportStatus::start, and Importer_NS::status_mutex.

Referenced by importDelimited(), and importGDAL().

213  {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
218 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
std::string import_id
Definition: Importer.h:803

+ Here is the caller graph for this function:

void Importer_NS::Importer::setGDALAuthorizationTokens ( const CopyParams copy_params)
staticprivate

Definition at line 4146 of file Importer.cpp.

References logger::INFO, LOG, Importer_NS::CopyParams::s3_access_key, Importer_NS::CopyParams::s3_endpoint, Importer_NS::CopyParams::s3_region, and Importer_NS::CopyParams::s3_secret_key.

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4146  {
4147  // for now we only support S3
4148  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4149  // only set if non-empty to allow GDAL defaults to persist
4150  // explicitly clear if empty to revert to default and not reuse a previous session's
4151  // keys
4152  if (copy_params.s3_region.size()) {
4153 #if DEBUG_AWS_AUTHENTICATION
4154  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4155 #endif
4156  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4157  } else {
4158 #if DEBUG_AWS_AUTHENTICATION
4159  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4160 #endif
4161  CPLSetConfigOption("AWS_REGION", nullptr);
4162  }
4163  if (copy_params.s3_endpoint.size()) {
4164 #if DEBUG_AWS_AUTHENTICATION
4165  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4166 #endif
4167  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4168  } else {
4169 #if DEBUG_AWS_AUTHENTICATION
4170  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4171 #endif
4172  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4173  }
4174  if (copy_params.s3_access_key.size()) {
4175 #if DEBUG_AWS_AUTHENTICATION
4176  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4177  << "'";
4178 #endif
4179  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4180  } else {
4181 #if DEBUG_AWS_AUTHENTICATION
4182  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4183 #endif
4184  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4185  }
4186  if (copy_params.s3_secret_key.size()) {
4187 #if DEBUG_AWS_AUTHENTICATION
4188  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4189  << "'";
4190 #endif
4191  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4192  } else {
4193 #if DEBUG_AWS_AUTHENTICATION
4194  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4195 #endif
4196  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4197  }
4198 
4199 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4200  // if we haven't set keys, we need to disable signed access
4201  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4202 #if DEBUG_AWS_AUTHENTICATION
4203  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4204 #endif
4205  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4206  } else {
4207 #if DEBUG_AWS_AUTHENTICATION
4208  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4209 #endif
4210  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4211  }
4212 #endif
4213 }
#define LOG(tag)
Definition: Logger.h:188
std::string s3_access_key
Definition: CopyParams.h:62
std::string s3_endpoint
Definition: CopyParams.h:65
std::string s3_region
Definition: CopyParams.h:64
std::string s3_secret_key
Definition: CopyParams.h:63

+ Here is the caller graph for this function:

Member Data Documentation

char* Importer_NS::Importer::buffer[2]
private

Definition at line 806 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t Importer_NS::Importer::file_size
private

Definition at line 804 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > Importer_NS::Importer::import_buffers_vec
private
std::string Importer_NS::Importer::import_id
private

Definition at line 803 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDAL(), and set_import_status().

std::mutex Importer_NS::Importer::init_gdal_mutex
staticprivate

Definition at line 810 of file Importer.h.

std::unique_ptr<bool[]> Importer_NS::Importer::is_array_a
private

Definition at line 809 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> Importer_NS::Importer::loader
private
size_t Importer_NS::Importer::max_threads
private

Definition at line 805 of file Importer.h.

Referenced by importDelimited(), Importer(), and importGDAL().


The documentation for this class was generated from the following files: