OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static OGRDataSource * openGDALDataset (const std::string &fileName, const CopyParams &copy_params)
 
static void setGDALAuthorizationTokens (const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
mapd_shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 761 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 812 of file Importer.h.

812 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 171 of file Importer.cpp.

175  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:171
char * f
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 177 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, i, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:859
std::string import_id
Definition: Importer.h:853
char * f
std::unique_ptr< Loader > loader
Definition: Importer.h:858
const std::string file_path
Definition: Importer.h:675
import_export::Importer::~Importer ( )
override

Definition at line 218 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

218  {
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3379 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), and importGDAL().

3380  {
3381  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3382  mapd_lock_guard<mapd_shared_mutex> read_lock(import_mutex_);
3384  // rollback to starting epoch - undo all the added records
3385  loader->setTableEpochs(table_epochs);
3386  } else {
3387  loader->checkpoint();
3388  }
3389  }
3390 
3391  if (loader->getTableDesc()->persistenceLevel ==
3392  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3393  // tables
3394  auto ms = measure<>::execution([&]() {
3395  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3396  if (!import_status_.load_failed) {
3397  for (auto& p : import_buffers_vec[0]) {
3398  if (!p->stringDictCheckpoint()) {
3399  LOG(ERROR) << "Checkpointing Dictionary for Column "
3400  << p->getColumnDesc()->columnName << " failed.";
3401  import_status_.load_failed = true;
3402  import_status_.load_msg = "Dictionary checkpoint failed";
3403  break;
3404  }
3405  }
3406  }
3407  });
3408  if (DEBUG_TIMING) {
3409  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3410  << std::endl;
3411  }
3412  }
3413 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:194
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
#define DEBUG_TIMING
Definition: Importer.cpp:160
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
static constexpr char const * FOREIGN_TABLE
std::unique_ptr< Loader > loader
Definition: Importer.h:858

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4737 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

4737  {
4738  return gdalStatInternal(path, copy_params, false);
4739 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4704

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4742 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

4743  {
4744  return gdalStatInternal(path, copy_params, true);
4745 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4704

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 4814 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), and setGDALAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

4816  {
4817  // lazy init GDAL
4819 
4820  // set authorization tokens
4822 
4823  // prepare to gather files
4824  std::vector<std::string> files;
4825 
4826  // gather the files recursively
4827  gdalGatherFilesInArchiveRecursive(archive_path, files);
4828 
4829  // convert to relative paths inside archive
4830  for (auto& file : files) {
4831  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4832  }
4833 
4834  // done
4835  return files;
4836 }
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4354
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4747

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 4839 of file Importer.cpp.

References CHECK, EMPTY, GEO, import_export::CopyParams::geo_explode_collections, Geospatial::GDAL::init(), NON_GEO, openGDALDataset(), setGDALAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

4841  {
4842  // lazy init GDAL
4844 
4845  // set authorization tokens
4847 
4848  // prepare to gather layer info
4849  std::vector<GeoFileLayerInfo> layer_info;
4850 
4851  // open the data set
4853  if (poDS == nullptr) {
4854  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4855  file_name);
4856  }
4857 
4858  // enumerate the layers
4859  for (auto&& poLayer : poDS->GetLayers()) {
4861  // prepare to read this layer
4862  poLayer->ResetReading();
4863  // skip layer if empty
4864  if (poLayer->GetFeatureCount() > 0) {
4865  // get first feature
4866  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4867  CHECK(first_feature);
4868  // check feature for geometry
4869  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4870  if (!geometry) {
4871  // layer has no geometry
4872  contents = GeoFileLayerContents::NON_GEO;
4873  } else {
4874  // check the geometry type
4875  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4876  switch (wkbFlatten(geometry_type)) {
4877  case wkbPoint:
4878  case wkbLineString:
4879  case wkbPolygon:
4880  case wkbMultiPolygon:
4881  // layer has supported geo
4882  contents = GeoFileLayerContents::GEO;
4883  break;
4884  case wkbMultiPoint:
4885  case wkbMultiLineString:
4886  // supported if geo_explode_collections is specified
4890  break;
4891  default:
4892  // layer has unsupported geometry
4894  break;
4895  }
4896  }
4897  }
4898  // store info for this layer
4899  layer_info.emplace_back(poLayer->GetName(), contents);
4900  }
4901 
4902  // done
4903  return layer_info;
4904 }
static void init()
Definition: GDAL.cpp:59
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:115
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:124
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4354
#define CHECK(condition)
Definition: Logger.h:203
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4424

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 4704 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, and setGDALAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

4706  {
4707  // lazy init GDAL
4709 
4710  // set authorization tokens
4712 
4713 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4714  // clear GDAL stat cache
4715  // without this, file existence will be cached, even if authentication changes
4716  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4717  VSICurlClearCache();
4718 #endif
4719 
4720  // stat path
4721  VSIStatBufL sb;
4722  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4723  if (result < 0) {
4724  return false;
4725  }
4726 
4727  // exists?
4728  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4729  return true;
4730  } else if (VSI_ISREG(sb.st_mode)) {
4731  return true;
4732  }
4733  return false;
4734 }
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4354

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4614 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, import_export::ogr_to_type(), openGDALDataset(), import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by DBHandler::detect_column_types().

4617  {
4618  std::list<ColumnDescriptor> cds;
4619 
4621  if (poDS == nullptr) {
4622  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4623  file_name);
4624  }
4625 
4626  OGRLayer& layer =
4628 
4629  layer.ResetReading();
4630  // TODO(andrewseidl): support multiple features
4631  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4632  if (poFeature == nullptr) {
4633  throw std::runtime_error("No features found in " + file_name);
4634  }
4635  // get fields as regular columns
4636  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4637  CHECK(poFDefn);
4638  int iField;
4639  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4640  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4641  auto typePair = ogr_to_type(poFieldDefn->GetType());
4642  ColumnDescriptor cd;
4643  cd.columnName = poFieldDefn->GetNameRef();
4644  cd.sourceName = poFieldDefn->GetNameRef();
4645  SQLTypeInfo ti;
4646  if (typePair.second) {
4647  ti.set_type(kARRAY);
4648  ti.set_subtype(typePair.first);
4649  } else {
4650  ti.set_type(typePair.first);
4651  }
4652  if (typePair.first == kTEXT) {
4654  ti.set_comp_param(32);
4655  }
4656  ti.set_fixed_size();
4657  cd.columnType = ti;
4658  cds.push_back(cd);
4659  }
4660  // get geo column, if any
4661  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4662  if (poGeometry) {
4663  ColumnDescriptor cd;
4664  cd.columnName = geo_column_name;
4665  cd.sourceName = geo_column_name;
4666 
4667  // get GDAL type
4668  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4669 
4670  // if exploding, override any collection type to child type
4672  if (ogr_type == wkbMultiPolygon) {
4673  ogr_type = wkbPolygon;
4674  } else if (ogr_type == wkbMultiLineString) {
4675  ogr_type = wkbLineString;
4676  } else if (ogr_type == wkbMultiPoint) {
4677  ogr_type = wkbPoint;
4678  }
4679  }
4680 
4681  // convert to internal type
4682  SQLTypes geoType = ogr_to_type(ogr_type);
4683 
4684  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4686  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4687  }
4688 
4689  // build full internal type
4690  SQLTypeInfo ti;
4691  ti.set_type(geoType);
4697  cd.columnType = ti;
4698 
4699  cds.push_back(cd);
4700  }
4701  return cds;
4702 }
void set_compression(EncodingType c)
Definition: sqltypes.h:414
SQLTypes
Definition: sqltypes.h:37
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4457
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:405
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:115
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:124
std::string sourceName
void set_input_srid(int d)
Definition: sqltypes.h:408
void set_fixed_size()
Definition: sqltypes.h:413
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:166
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:410
void set_comp_param(int p)
Definition: sqltypes.h:415
std::string geo_layer_name
Definition: CopyParams.h:79
Definition: sqltypes.h:51
#define CHECK(condition)
Definition: Logger.h:203
SQLTypeInfo columnType
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4424
std::string columnName
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4559
EncodingType geo_coords_encoding
Definition: CopyParams.h:74
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:404

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 777 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

777  {
778  return loader->get_column_descs();
779  }
std::unique_ptr< Loader > loader
Definition: Importer.h:858

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 776 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

776 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 786 of file Importer.h.

References i, and import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

786  {
787  return import_buffers_vec[i];
788  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 783 of file Importer.h.

References import_buffers_vec.

783  {
784  return import_buffers_vec;
785  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 230 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

230  {
231  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
232  return import_status_map.at(import_id);
233 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:169
std::string import_id
Definition: Importer.h:853
mapd_shared_lock< mapd_shared_mutex > read_lock
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:168

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 789 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

789 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:859

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 822 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

822 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:858

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 844 of file Importer.h.

References loader.

844 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:858
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)

Definition at line 4110 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4110  {
4111  return DataStreamSink::archivePlumber(session_info);
4112 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3415

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4114 of file Importer.cpp.

References import_export::CopyParams::buffer_size, cat(), CHECK, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::DataStreamSink::copy_params, logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), omnisci::fopen(), g_max_import_threads, import_export::CopyParams::geo_assign_render_groups, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), i, import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, import_export::CopyParams::max_reject, max_threads, import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::CopyParams::threads, import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4117  {
4119  auto query_session = session_info ? session_info->get_session_id() : "";
4120 
4121  if (!p_file) {
4122  p_file = fopen(file_path.c_str(), "rb");
4123  }
4124  if (!p_file) {
4125  throw std::runtime_error("failed to open file '" + file_path +
4126  "': " + strerror(errno));
4127  }
4128 
4129  if (!decompressed) {
4130  (void)fseek(p_file, 0, SEEK_END);
4131  file_size = ftell(p_file);
4132  }
4133 
4134  if (copy_params.threads == 0) {
4135  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4137  } else {
4138  max_threads = static_cast<size_t>(copy_params.threads);
4139  }
4140  VLOG(1) << "Delimited import # threads: " << max_threads;
4141 
4142  // deal with small files
4143  size_t alloc_size = copy_params.buffer_size;
4144  if (!decompressed && file_size < alloc_size) {
4145  alloc_size = file_size;
4146  }
4147 
4148  for (size_t i = 0; i < max_threads; i++) {
4149  import_buffers_vec.emplace_back();
4150  for (const auto cd : loader->get_column_descs()) {
4151  import_buffers_vec[i].emplace_back(
4152  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4153  }
4154  }
4155 
4156  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4157  size_t current_pos = 0;
4158  size_t end_pos;
4159  size_t begin_pos = 0;
4160 
4161  (void)fseek(p_file, current_pos, SEEK_SET);
4162  size_t size =
4163  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4164 
4165  // make render group analyzers for each poly column
4166  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4168  auto& cat = loader->getCatalog();
4169  auto* td = loader->getTableDesc();
4170  CHECK(td);
4171  auto column_descriptors =
4172  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4173  for (auto const& cd : column_descriptors) {
4174  if (IS_GEO_POLY(cd->columnType.get_type())) {
4175  auto rga = std::make_shared<RenderGroupAnalyzer>();
4176  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4177  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4178  }
4179  }
4180  }
4181 
4182  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4183  loader->getTableDesc()->tableId};
4184  auto table_epochs = loader->getTableEpochs();
4186  {
4187  std::list<std::future<ImportStatus>> threads;
4188 
4189  // use a stack to track thread_ids which must not overlap among threads
4190  // because thread_id is used to index import_buffers_vec[]
4191  std::stack<size_t> stack_thread_ids;
4192  for (size_t i = 0; i < max_threads; i++) {
4193  stack_thread_ids.push(i);
4194  }
4195  // added for true row index on error
4196  size_t first_row_index_this_buffer = 0;
4197 
4198  while (size > 0) {
4199  unsigned int num_rows_this_buffer = 0;
4200  CHECK(scratch_buffer);
4201  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4202  scratch_buffer,
4203  size,
4204  copy_params,
4205  first_row_index_this_buffer,
4206  num_rows_this_buffer,
4207  p_file);
4208 
4209  // unput residual
4210  int nresidual = size - end_pos;
4211  std::unique_ptr<char[]> unbuf;
4212  if (nresidual > 0) {
4213  unbuf = std::make_unique<char[]>(nresidual);
4214  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4215  }
4216 
4217  // get a thread_id not in use
4218  auto thread_id = stack_thread_ids.top();
4219  stack_thread_ids.pop();
4220  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4221 
4222  threads.push_back(std::async(std::launch::async,
4224  thread_id,
4225  this,
4226  std::move(scratch_buffer),
4227  begin_pos,
4228  end_pos,
4229  end_pos,
4230  columnIdToRenderGroupAnalyzerMap,
4231  first_row_index_this_buffer,
4232  session_info,
4233  executor));
4234 
4235  first_row_index_this_buffer += num_rows_this_buffer;
4236 
4237  current_pos += end_pos;
4238  scratch_buffer = std::make_unique<char[]>(alloc_size);
4239  CHECK(scratch_buffer);
4240  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4241  size = nresidual +
4242  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4243 
4244  begin_pos = 0;
4245  while (threads.size() > 0) {
4246  int nready = 0;
4247  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4248  it != threads.end();) {
4249  auto& p = *it;
4250  std::chrono::milliseconds span(0);
4251  if (p.wait_for(span) == std::future_status::ready) {
4252  auto ret_import_status = p.get();
4253  {
4254  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4255  import_status_ += ret_import_status;
4256  if (ret_import_status.load_failed) {
4258  }
4259  }
4260  // sum up current total file offsets
4261  size_t total_file_offset{0};
4262  if (decompressed) {
4263  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4264  for (const auto file_offset : file_offsets) {
4265  total_file_offset += file_offset;
4266  }
4267  }
4268  // estimate number of rows per current total file offset
4269  if (decompressed ? total_file_offset : current_pos) {
4271  (decompressed ? (float)total_file_size / total_file_offset
4272  : (float)file_size / current_pos) *
4273  import_status_.rows_completed;
4274  }
4275  VLOG(3) << "rows_completed " << import_status_.rows_completed
4276  << ", rows_estimated " << import_status_.rows_estimated
4277  << ", total_file_size " << total_file_size << ", total_file_offset "
4278  << total_file_offset;
4280  // recall thread_id for reuse
4281  stack_thread_ids.push(ret_import_status.thread_id);
4282  threads.erase(it++);
4283  ++nready;
4284  } else {
4285  ++it;
4286  }
4287  }
4288 
4289  if (nready == 0) {
4290  std::this_thread::yield();
4291  }
4292 
4293  // on eof, wait all threads to finish
4294  if (0 == size) {
4295  continue;
4296  }
4297 
4298  // keep reading if any free thread slot
4299  // this is one of the major difference from old threading model !!
4300  if (threads.size() < max_threads) {
4301  break;
4302  }
4303  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4305  break;
4306  }
4307  }
4308  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
4310  import_status_.load_failed = true;
4311  // todo use better message
4312  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4313  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4314  break;
4315  }
4317  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4318  break;
4319  }
4320  }
4321 
4322  // join dangling threads in case of LOG(ERROR) above
4323  for (auto& p : threads) {
4324  p.wait();
4325  }
4326  }
4327 
4328  checkpoint(table_epochs);
4329 
4330  fclose(p_file);
4331  p_file = nullptr;
4332  return import_status_;
4333 }
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:1826
::FILE * fopen(const char *filename, const char *mode)
Definition: omnisci_fs.cpp:72
#define LOG(tag)
Definition: Logger.h:194
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:163
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857
std::string get_session_id() const
Definition: SessionInfo.h:77
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
std::string import_id
Definition: Importer.h:853
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3379
ThreadId thread_id()
Definition: Logger.cpp:732
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:203
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:157
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< size_t > file_offsets
Definition: Importer.h:680
size_t g_max_import_threads
Definition: Importer.cpp:84
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
#define VLOG(n)
Definition: Logger.h:297
std::unique_ptr< Loader > loader
Definition: Importer.h:858
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366
const std::string file_path
Definition: Importer.h:675
#define IS_GEO_POLY(T)
Definition: sqltypes.h:249

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( std::map< std::string, std::string >  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 4906 of file Importer.cpp.

References cat(), CHECK, CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::DataStreamSink::copy_params, logger::ERROR, g_enable_non_kernel_time_query_interrupt, g_max_import_threads, import_export::CopyParams::geo_assign_render_groups, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_layer_name, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), i, import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, import_export::CopyParams::max_reject, max_threads, openGDALDataset(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::CopyParams::threads, toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by QueryRunner::ImportDriver::importGeoTable().

4907  {
4908  // initial status
4911  if (poDS == nullptr) {
4912  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4913  file_path);
4914  }
4915 
4916  OGRLayer& layer =
4918 
4919  // get the number of features in this layer
4920  size_t numFeatures = layer.GetFeatureCount();
4921 
4922  // build map of metadata field (additional columns) name to index
4923  // use shared_ptr since we need to pass it to the worker
4924  FieldNameToIndexMapType fieldNameToIndexMap;
4925  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4926  CHECK(poFDefn);
4927  size_t numFields = poFDefn->GetFieldCount();
4928  for (size_t iField = 0; iField < numFields; iField++) {
4929  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4930  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4931  }
4932 
4933  // the geographic spatial reference we want to put everything in
4934  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4935  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4936 
4937 #if GDAL_VERSION_MAJOR >= 3
4938  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4939  // this results in X and Y being transposed for angle-based
4940  // coordinate systems. This restores the previous behavior.
4941  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4942 #endif
4943 
4944 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4945  // just one "thread"
4946  max_threads = 1;
4947 #else
4948  // how many threads to use
4949  if (copy_params.threads == 0) {
4950  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4952  } else {
4954  }
4955 #endif
4956 
4957  VLOG(1) << "GDAL import # threads: " << max_threads;
4958 
4959  // import geo table is specifically handled in both DBHandler and QueryRunner
4960  // that is separate path against a normal SQL execution
4961  // so we here explicitly enroll the import session to allow interruption
4962  // while importing geo table
4963  auto query_session = session_info ? session_info->get_session_id() : "";
4964  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
4966  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
4967  executor->enrollQuerySession(query_session,
4968  "Import Geo Table",
4969  query_submitted_time,
4971  QuerySessionStatus::QueryStatus::RUNNING);
4972  }
4973 
4974  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
4975  // reset the runtime query interrupt status
4976  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
4977  executor->clearQuerySessionStatus(query_session, query_submitted_time, false);
4978  }
4979  };
4980 
4981  // make an import buffer for each thread
4982  CHECK_EQ(import_buffers_vec.size(), 0u);
4984  for (size_t i = 0; i < max_threads; i++) {
4985  for (const auto cd : loader->get_column_descs()) {
4986  import_buffers_vec[i].emplace_back(
4987  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4988  }
4989  }
4990 
4991  // make render group analyzers for each poly column
4992  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4994  auto& cat = loader->getCatalog();
4995  auto* td = loader->getTableDesc();
4996  CHECK(td);
4997  auto column_descriptors =
4998  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4999  for (auto const& cd : column_descriptors) {
5000  if (IS_GEO_POLY(cd->columnType.get_type())) {
5001  auto rga = std::make_shared<RenderGroupAnalyzer>();
5002  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5003  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5004  }
5005  }
5006  }
5007 
5008 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5009  // threads
5010  std::list<std::future<ImportStatus>> threads;
5011 
5012  // use a stack to track thread_ids which must not overlap among threads
5013  // because thread_id is used to index import_buffers_vec[]
5014  std::stack<size_t> stack_thread_ids;
5015  for (size_t i = 0; i < max_threads; i++) {
5016  stack_thread_ids.push(i);
5017  }
5018 #endif
5019 
5020  // checkpoint the table
5021  auto table_epochs = loader->getTableEpochs();
5022 
5023  // reset the layer
5024  layer.ResetReading();
5025 
5026  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5027 
5028  // make a features buffer for each thread
5029  std::vector<FeaturePtrVector> features(max_threads);
5030 
5031  // for each feature...
5032  size_t firstFeatureThisChunk = 0;
5033  while (firstFeatureThisChunk < numFeatures) {
5034  // how many features this chunk
5035  size_t numFeaturesThisChunk =
5036  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5037 
5038 // get a thread_id not in use
5039 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5040  size_t thread_id = 0;
5041 #else
5042  auto thread_id = stack_thread_ids.top();
5043  stack_thread_ids.pop();
5044  CHECK(thread_id < max_threads);
5045 #endif
5046 
5047  // fill features buffer for new thread
5048  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5049  features[thread_id].emplace_back(layer.GetNextFeature());
5050  }
5051 
5052 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5053  // call worker function directly
5054  auto ret_import_status = import_thread_shapefile(0,
5055  this,
5056  poGeographicSR.get(),
5057  std::move(features[thread_id]),
5058  firstFeatureThisChunk,
5059  numFeaturesThisChunk,
5060  fieldNameToIndexMap,
5061  columnNameToSourceNameMap,
5062  columnIdToRenderGroupAnalyzerMap,
5063  session_info,
5064  executor.get());
5065  import_status += ret_import_status;
5066  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5067  import_status.rows_completed;
5068  set_import_status(import_id, import_status);
5069 #else
5070  // fire up that thread to import this geometry
5071  threads.push_back(std::async(std::launch::async,
5073  thread_id,
5074  this,
5075  poGeographicSR.get(),
5076  std::move(features[thread_id]),
5077  firstFeatureThisChunk,
5078  numFeaturesThisChunk,
5079  fieldNameToIndexMap,
5080  columnNameToSourceNameMap,
5081  columnIdToRenderGroupAnalyzerMap,
5082  session_info,
5083  executor.get()));
5084 
5085  // let the threads run
5086  while (threads.size() > 0) {
5087  int nready = 0;
5088  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5089  it != threads.end();) {
5090  auto& p = *it;
5091  std::chrono::milliseconds span(
5092  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5093  if (p.wait_for(span) == std::future_status::ready) {
5094  auto ret_import_status = p.get();
5095  {
5096  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
5097  import_status_ += ret_import_status;
5099  ((float)firstFeatureThisChunk / (float)numFeatures) *
5103  break;
5104  }
5105  }
5106  // recall thread_id for reuse
5107  stack_thread_ids.push(ret_import_status.thread_id);
5108 
5109  threads.erase(it++);
5110  ++nready;
5111  } else {
5112  ++it;
5113  }
5114  }
5115 
5116  if (nready == 0) {
5117  std::this_thread::yield();
5118  }
5119 
5120  // keep reading if any free thread slot
5121  // this is one of the major difference from old threading model !!
5122  if (threads.size() < max_threads) {
5123  break;
5124  }
5125  }
5126 #endif
5127 
5128  // out of rows?
5129 
5130  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
5132  import_status_.load_failed = true;
5133  // todo use better message
5134  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5135  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5136  break;
5137  }
5139  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5140  "more details";
5141  break;
5142  }
5143 
5144  firstFeatureThisChunk += numFeaturesThisChunk;
5145  }
5146 
5147 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5148  // wait for any remaining threads
5149  if (threads.size()) {
5150  for (auto& p : threads) {
5151  // wait for the thread
5152  p.wait();
5153  // get the result and update the final import status
5154  auto ret_import_status = p.get();
5155  import_status_ += ret_import_status;
5158  }
5159  }
5160 #endif
5161 
5162  checkpoint(table_epochs);
5163 
5164  return import_status_;
5165 }
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:154
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::string cat(Ts &&...args)
std::string toString(const ExtArgumentType &sig_type)
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:134
#define LOG(tag)
Definition: Logger.h:194
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4457
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:115
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:163
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:857
std::string get_session_id() const
Definition: SessionInfo.h:77
std::string geo_layer_name
Definition: CopyParams.h:79
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:2192
std::string import_id
Definition: Importer.h:853
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3379
ThreadId thread_id()
Definition: Logger.cpp:732
#define CHECK(condition)
Definition: Logger.h:203
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:157
mapd_unique_lock< mapd_shared_mutex > write_lock
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4424
size_t g_max_import_threads
Definition: Importer.cpp:84
#define VLOG(n)
Definition: Logger.h:297
std::unique_ptr< Loader > loader
Definition: Importer.h:858
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366
const std::string file_path
Definition: Importer.h:675
#define IS_GEO_POLY(T)
Definition: sqltypes.h:249

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3369 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

3371  {
3372  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3373  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3374  import_status_.load_failed = true;
3375  import_status_.load_msg = loader->getErrorMessage();
3376  }
3377 }
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
mapd_unique_lock< mapd_shared_mutex > write_lock
std::unique_ptr< Loader > loader
Definition: Importer.h:858

+ Here is the caller graph for this function:

OGRDataSource * import_export::Importer::openGDALDataset ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4424 of file Importer.cpp.

References logger::ERROR, logger::INFO, Geospatial::GDAL::init(), LOG, and setGDALAuthorizationTokens().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptors(), importGDAL(), and readMetadataSampleGDAL().

4425  {
4426  // lazy init GDAL
4428 
4429  // set authorization tokens
4431 
4432  // open the file
4433  OGRDataSource* poDS;
4434 #if GDAL_VERSION_MAJOR == 1
4435  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4436 #else
4437  poDS = (OGRDataSource*)GDALOpenEx(
4438  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4439  if (poDS == nullptr) {
4440  poDS = (OGRDataSource*)GDALOpenEx(
4441  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4442  if (poDS) {
4443  LOG(INFO) << "openGDALDataset had to open as read-only";
4444  }
4445  }
4446 #endif
4447  if (poDS == nullptr) {
4448  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4449  }
4450  // NOTE(adb): If extending this function, refactor to ensure any errors will not
4451  // result in a memory leak if GDAL successfully opened the input dataset.
4452  return poDS;
4453 }
#define LOG(tag)
Definition: Logger.h:194
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4354

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4480 of file Importer.cpp.

References CHECK, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), i, and openGDALDataset().

Referenced by DBHandler::detect_column_types().

4485  {
4487  if (poDS == nullptr) {
4488  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4489  file_name);
4490  }
4491 
4492  OGRLayer& layer =
4494 
4495  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4496  CHECK(poFDefn);
4497 
4498  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4499  auto nFeats = layer.GetFeatureCount();
4500  size_t numFeatures =
4501  std::max(static_cast<decltype(nFeats)>(0),
4502  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4503  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4504  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4505  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4506  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4507  }
4508  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4509  layer.ResetReading();
4510  size_t iFeature = 0;
4511  while (iFeature < numFeatures) {
4512  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4513  if (!poFeature) {
4514  break;
4515  }
4516 
4517  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4518  if (poGeometry != nullptr) {
4519  // validate geom type (again?)
4520  switch (wkbFlatten(poGeometry->getGeometryType())) {
4521  case wkbPoint:
4522  case wkbLineString:
4523  case wkbPolygon:
4524  case wkbMultiPolygon:
4525  break;
4526  case wkbMultiPoint:
4527  case wkbMultiLineString:
4528  // supported if geo_explode_collections is specified
4530  throw std::runtime_error("Unsupported geometry type: " +
4531  std::string(poGeometry->getGeometryName()));
4532  }
4533  break;
4534  default:
4535  throw std::runtime_error("Unsupported geometry type: " +
4536  std::string(poGeometry->getGeometryName()));
4537  }
4538 
4539  // populate metadata for regular fields
4540  for (auto i : metadata) {
4541  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4542  if (iField >= 0) { // geom is -1
4543  metadata[i.first].at(iFeature) =
4544  std::string(poFeature->GetFieldAsString(iField));
4545  }
4546  }
4547 
4548  // populate metadata for geo column with WKT string
4549  char* wkts = nullptr;
4550  poGeometry->exportToWkt(&wkts);
4551  CHECK(wkts);
4552  metadata[geo_column_name].at(iFeature) = wkts;
4553  CPLFree(wkts);
4554  }
4555  iFeature++;
4556  }
4557 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4457
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:115
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:124
std::string geo_layer_name
Definition: CopyParams.h:79
#define CHECK(condition)
Definition: Logger.h:203
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4424

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group 
)
static

Definition at line 1459 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), DBHandler::load_table(), and foreign_storage::csv_file_buffer_parser::process_geo_column().

1468  {
1469  const auto col_ti = cd->columnType;
1470  const auto col_type = col_ti.get_type();
1471  auto columnId = cd->columnId;
1472  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1473  bool is_null_geo = false;
1474  bool is_null_point = false;
1475  if (!col_ti.get_notnull()) {
1476  // Check for NULL geo
1477  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1478  is_null_point = true;
1479  coords.clear();
1480  }
1481  is_null_geo = coords.empty();
1482  if (is_null_point) {
1483  coords.push_back(NULL_ARRAY_DOUBLE);
1484  coords.push_back(NULL_DOUBLE);
1485  // Treating POINT coords as notnull, need to store actual encoding
1486  // [un]compressed+[not]null
1487  is_null_geo = false;
1488  }
1489  }
1490  TDatum tdd_coords;
1491  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1492  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1493  // in a fixlen array, compressed and uncompressed.
1494  if (!is_null_geo) {
1495  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1496  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1497  for (auto cc : compressed_coords) {
1498  tdd_coords.val.arr_val.emplace_back();
1499  tdd_coords.val.arr_val.back().val.int_val = cc;
1500  }
1501  }
1502  tdd_coords.is_null = is_null_geo;
1503  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1504 
1505  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1506  // Create ring_sizes array value and add it to the physical column
1507  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1508  TDatum tdd_ring_sizes;
1509  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1510  if (!is_null_geo) {
1511  for (auto ring_size : ring_sizes) {
1512  tdd_ring_sizes.val.arr_val.emplace_back();
1513  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1514  }
1515  }
1516  tdd_ring_sizes.is_null = is_null_geo;
1517  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1518  }
1519 
1520  if (col_type == kMULTIPOLYGON) {
1521  // Create poly_rings array value and add it to the physical column
1522  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1523  TDatum tdd_poly_rings;
1524  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1525  if (!is_null_geo) {
1526  for (auto num_rings : poly_rings) {
1527  tdd_poly_rings.val.arr_val.emplace_back();
1528  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1529  }
1530  }
1531  tdd_poly_rings.is_null = is_null_geo;
1532  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1533  }
1534 
1535  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1536  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1537  TDatum tdd_bounds;
1538  tdd_bounds.val.arr_val.reserve(bounds.size());
1539  if (!is_null_geo) {
1540  for (auto b : bounds) {
1541  tdd_bounds.val.arr_val.emplace_back();
1542  tdd_bounds.val.arr_val.back().val.real_val = b;
1543  }
1544  }
1545  tdd_bounds.is_null = is_null_geo;
1546  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1547  }
1548 
1549  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1550  // Create render_group value and add it to the physical column
1551  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1552  TDatum td_render_group;
1553  td_render_group.val.int_val = render_group;
1554  td_render_group.is_null = is_null_geo;
1555  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1556  }
1557 }
#define NULL_DOUBLE
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
std::vector< int > &  render_groups_column 
)
static

Definition at line 1559 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::load_table_binary_columnar_internal().

1568  {
1569  const auto col_ti = cd->columnType;
1570  const auto col_type = col_ti.get_type();
1571  auto columnId = cd->columnId;
1572 
1573  auto coords_row_count = coords_column.size();
1574  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1575  for (auto& coords : coords_column) {
1576  bool is_null_geo = false;
1577  bool is_null_point = false;
1578  if (!col_ti.get_notnull()) {
1579  // Check for NULL geo
1580  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1581  is_null_point = true;
1582  coords.clear();
1583  }
1584  is_null_geo = coords.empty();
1585  if (is_null_point) {
1586  coords.push_back(NULL_ARRAY_DOUBLE);
1587  coords.push_back(NULL_DOUBLE);
1588  // Treating POINT coords as notnull, need to store actual encoding
1589  // [un]compressed+[not]null
1590  is_null_geo = false;
1591  }
1592  }
1593  std::vector<TDatum> td_coords_data;
1594  if (!is_null_geo) {
1595  std::vector<uint8_t> compressed_coords =
1596  Geospatial::compress_coords(coords, col_ti);
1597  for (auto const& cc : compressed_coords) {
1598  TDatum td_byte;
1599  td_byte.val.int_val = cc;
1600  td_coords_data.push_back(td_byte);
1601  }
1602  }
1603  TDatum tdd_coords;
1604  tdd_coords.val.arr_val = td_coords_data;
1605  tdd_coords.is_null = is_null_geo;
1606  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1607  }
1608  col_idx++;
1609 
1610  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1611  if (ring_sizes_column.size() != coords_row_count) {
1612  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1613  }
1614  // Create ring_sizes array value and add it to the physical column
1615  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1616  for (auto const& ring_sizes : ring_sizes_column) {
1617  bool is_null_geo = false;
1618  if (!col_ti.get_notnull()) {
1619  // Check for NULL geo
1620  is_null_geo = ring_sizes.empty();
1621  }
1622  std::vector<TDatum> td_ring_sizes;
1623  for (auto const& ring_size : ring_sizes) {
1624  TDatum td_ring_size;
1625  td_ring_size.val.int_val = ring_size;
1626  td_ring_sizes.push_back(td_ring_size);
1627  }
1628  TDatum tdd_ring_sizes;
1629  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1630  tdd_ring_sizes.is_null = is_null_geo;
1631  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1632  }
1633  col_idx++;
1634  }
1635 
1636  if (col_type == kMULTIPOLYGON) {
1637  if (poly_rings_column.size() != coords_row_count) {
1638  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1639  }
1640  // Create poly_rings array value and add it to the physical column
1641  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  for (auto const& poly_rings : poly_rings_column) {
1643  bool is_null_geo = false;
1644  if (!col_ti.get_notnull()) {
1645  // Check for NULL geo
1646  is_null_geo = poly_rings.empty();
1647  }
1648  std::vector<TDatum> td_poly_rings;
1649  for (auto const& num_rings : poly_rings) {
1650  TDatum td_num_rings;
1651  td_num_rings.val.int_val = num_rings;
1652  td_poly_rings.push_back(td_num_rings);
1653  }
1654  TDatum tdd_poly_rings;
1655  tdd_poly_rings.val.arr_val = td_poly_rings;
1656  tdd_poly_rings.is_null = is_null_geo;
1657  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1658  }
1659  col_idx++;
1660  }
1661 
1662  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1663  if (bounds_column.size() != coords_row_count) {
1664  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1665  }
1666  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1667  for (auto const& bounds : bounds_column) {
1668  bool is_null_geo = false;
1669  if (!col_ti.get_notnull()) {
1670  // Check for NULL geo
1671  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1672  }
1673  std::vector<TDatum> td_bounds_data;
1674  for (auto const& b : bounds) {
1675  TDatum td_double;
1676  td_double.val.real_val = b;
1677  td_bounds_data.push_back(td_double);
1678  }
1679  TDatum tdd_bounds;
1680  tdd_bounds.val.arr_val = td_bounds_data;
1681  tdd_bounds.is_null = is_null_geo;
1682  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1683  }
1684  col_idx++;
1685  }
1686 
1687  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1688  // Create render_group value and add it to the physical column
1689  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1690  for (auto const& render_group : render_groups_column) {
1691  TDatum td_render_group;
1692  td_render_group.val.int_val = render_group;
1693  td_render_group.is_null = false;
1694  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1695  }
1696  col_idx++;
1697  }
1698 }
#define NULL_DOUBLE
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:203
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 235 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), and importGDAL().

235  {
236  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
237  is.end = std::chrono::steady_clock::now();
238  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
240 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:169
std::string import_id
Definition: Importer.h:853
mapd_unique_lock< mapd_shared_mutex > write_lock
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:168

+ Here is the caller graph for this function:

void import_export::Importer::setGDALAuthorizationTokens ( const CopyParams copy_params)
staticprivate

Definition at line 4354 of file Importer.cpp.

References logger::INFO, LOG, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, and import_export::CopyParams::s3_secret_key.

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4354  {
4355  // for now we only support S3
4356  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4357  // only set if non-empty to allow GDAL defaults to persist
4358  // explicitly clear if empty to revert to default and not reuse a previous session's
4359  // keys
4360  if (copy_params.s3_region.size()) {
4361 #if DEBUG_AWS_AUTHENTICATION
4362  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4363 #endif
4364  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4365  } else {
4366 #if DEBUG_AWS_AUTHENTICATION
4367  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4368 #endif
4369  CPLSetConfigOption("AWS_REGION", nullptr);
4370  }
4371  if (copy_params.s3_endpoint.size()) {
4372 #if DEBUG_AWS_AUTHENTICATION
4373  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4374 #endif
4375  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4376  } else {
4377 #if DEBUG_AWS_AUTHENTICATION
4378  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4379 #endif
4380  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4381  }
4382  if (copy_params.s3_access_key.size()) {
4383 #if DEBUG_AWS_AUTHENTICATION
4384  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4385  << "'";
4386 #endif
4387  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4388  } else {
4389 #if DEBUG_AWS_AUTHENTICATION
4390  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4391 #endif
4392  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4393  }
4394  if (copy_params.s3_secret_key.size()) {
4395 #if DEBUG_AWS_AUTHENTICATION
4396  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4397  << "'";
4398 #endif
4399  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4400  } else {
4401 #if DEBUG_AWS_AUTHENTICATION
4402  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4403 #endif
4404  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4405  }
4406 
4407 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4408  // if we haven't set keys, we need to disable signed access
4409  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4410 #if DEBUG_AWS_AUTHENTICATION
4411  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4412 #endif
4413  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4414  } else {
4415 #if DEBUG_AWS_AUTHENTICATION
4416  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4417 #endif
4418  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4419  }
4420 #endif
4421 }
std::string s3_secret_key
Definition: CopyParams.h:63
#define LOG(tag)
Definition: Logger.h:194
std::string s3_access_key
Definition: CopyParams.h:62

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 856 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 854 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 860 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 859 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 855 of file Importer.h.

Referenced by importDelimited(), Importer(), and importGDAL().


The documentation for this class was generated from the following files: