OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, std::vector< int > &render_groups_column)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static OGRDataSource * openGDALDataset (const std::string &fileName, const CopyParams &copy_params)
 
static void setGDALAuthorizationTokens (const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
mapd_shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 765 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 816 of file Importer.h.

816 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 170 of file Importer.cpp.

174  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:170
char * f
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 176 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, i, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

177  : DataStreamSink(p, f), loader(providedLoader) {
178  import_id = boost::filesystem::path(file_path).filename().string();
179  file_size = 0;
180  max_threads = 0;
181  p_file = nullptr;
182  buffer[0] = nullptr;
183  buffer[1] = nullptr;
184  // we may be overallocating a little more memory here due to dropping phy cols.
185  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
186  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
187  int i = 0;
188  bool has_array = false;
189  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
190  int skip_physical_cols = 0;
191  for (auto& p : loader->get_column_descs()) {
192  // phy geo columns can't be in input file
193  if (skip_physical_cols-- > 0) {
194  continue;
195  }
196  // neither are rowid or $deleted$
197  // note: columns can be added after rowid/$deleted$
198  if (p->isVirtualCol || p->isDeletedCol) {
199  continue;
200  }
201  skip_physical_cols = p->columnType.get_physical_cols();
202  if (p->columnType.get_type() == kARRAY) {
203  is_array.get()[i] = true;
204  has_array = true;
205  } else {
206  is_array.get()[i] = false;
207  }
208  ++i;
209  }
210  if (has_array) {
211  is_array_a = std::unique_ptr<bool[]>(is_array.release());
212  } else {
213  is_array_a = std::unique_ptr<bool[]>(nullptr);
214  }
215 }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:863
std::string import_id
Definition: Importer.h:857
char * f
std::unique_ptr< Loader > loader
Definition: Importer.h:862
const std::string file_path
Definition: Importer.h:703
import_export::Importer::~Importer ( )
override

Definition at line 217 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

217  {
218  if (p_file != nullptr) {
219  fclose(p_file);
220  }
221  if (buffer[0] != nullptr) {
222  free(buffer[0]);
223  }
224  if (buffer[1] != nullptr) {
225  free(buffer[1]);
226  }
227 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3580 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), and importGDAL().

3581  {
3582  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3583  mapd_lock_guard<mapd_shared_mutex> read_lock(import_mutex_);
3585  // rollback to starting epoch - undo all the added records
3586  loader->setTableEpochs(table_epochs);
3587  } else {
3588  loader->checkpoint();
3589  }
3590  }
3591 
3592  if (loader->getTableDesc()->persistenceLevel ==
3593  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3594  // tables
3595  auto ms = measure<>::execution([&]() {
3596  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3597  if (!import_status_.load_failed) {
3598  for (auto& p : import_buffers_vec[0]) {
3599  if (!p->stringDictCheckpoint()) {
3600  LOG(ERROR) << "Checkpointing Dictionary for Column "
3601  << p->getColumnDesc()->columnName << " failed.";
3602  import_status_.load_failed = true;
3603  import_status_.load_msg = "Dictionary checkpoint failed";
3604  break;
3605  }
3606  }
3607  }
3608  });
3609  if (DEBUG_TIMING) {
3610  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3611  << std::endl;
3612  }
3613  }
3614 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:203
mapd_shared_mutex import_mutex_
Definition: Importer.h:706
#define DEBUG_TIMING
Definition: Importer.cpp:159
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
static constexpr char const * FOREIGN_TABLE
std::unique_ptr< Loader > loader
Definition: Importer.h:862

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4958 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

4958  {
4959  return gdalStatInternal(path, copy_params, false);
4960 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4925

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 4963 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

4964  {
4965  return gdalStatInternal(path, copy_params, true);
4966 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4925

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5035 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), and setGDALAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5037  {
5038  // lazy init GDAL
5040 
5041  // set authorization tokens
5043 
5044  // prepare to gather files
5045  std::vector<std::string> files;
5046 
5047  // gather the files recursively
5048  gdalGatherFilesInArchiveRecursive(archive_path, files);
5049 
5050  // convert to relative paths inside archive
5051  for (auto& file : files) {
5052  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5053  }
5054 
5055  // done
5056  return files;
5057 }
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4575
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4968

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5060 of file Importer.cpp.

References CHECK, EMPTY, GEO, import_export::CopyParams::geo_explode_collections, Geospatial::GDAL::init(), NON_GEO, openGDALDataset(), setGDALAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::import_geo_table().

5062  {
5063  // lazy init GDAL
5065 
5066  // set authorization tokens
5068 
5069  // prepare to gather layer info
5070  std::vector<GeoFileLayerInfo> layer_info;
5071 
5072  // open the data set
5074  if (poDS == nullptr) {
5075  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
5076  file_name);
5077  }
5078 
5079  // enumerate the layers
5080  for (auto&& poLayer : poDS->GetLayers()) {
5082  // prepare to read this layer
5083  poLayer->ResetReading();
5084  // skip layer if empty
5085  if (poLayer->GetFeatureCount() > 0) {
5086  // get first feature
5087  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
5088  CHECK(first_feature);
5089  // check feature for geometry
5090  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5091  if (!geometry) {
5092  // layer has no geometry
5093  contents = GeoFileLayerContents::NON_GEO;
5094  } else {
5095  // check the geometry type
5096  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5097  switch (wkbFlatten(geometry_type)) {
5098  case wkbPoint:
5099  case wkbLineString:
5100  case wkbPolygon:
5101  case wkbMultiPolygon:
5102  // layer has supported geo
5103  contents = GeoFileLayerContents::GEO;
5104  break;
5105  case wkbMultiPoint:
5106  case wkbMultiLineString:
5107  // supported if geo_explode_collections is specified
5111  break;
5112  default:
5113  // layer has unsupported geometry
5115  break;
5116  }
5117  }
5118  }
5119  // store info for this layer
5120  layer_info.emplace_back(poLayer->GetName(), contents);
5121  }
5122 
5123  // done
5124  return layer_info;
5125 }
static void init()
Definition: GDAL.cpp:59
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:114
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:123
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4575
#define CHECK(condition)
Definition: Logger.h:209
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4645

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 4925 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, and setGDALAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

4927  {
4928  // lazy init GDAL
4930 
4931  // set authorization tokens
4933 
4934 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4935  // clear GDAL stat cache
4936  // without this, file existence will be cached, even if authentication changes
4937  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4938  VSICurlClearCache();
4939 #endif
4940 
4941  // stat path
4942  VSIStatBufL sb;
4943  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4944  if (result < 0) {
4945  return false;
4946  }
4947 
4948  // exists?
4949  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4950  return true;
4951  } else if (VSI_ISREG(sb.st_mode)) {
4952  return true;
4953  }
4954  return false;
4955 }
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4575

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4835 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kMULTIPOLYGON, kPOLYGON, kTEXT, import_export::ogr_to_type(), openGDALDataset(), import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by DBHandler::detect_column_types().

4838  {
4839  std::list<ColumnDescriptor> cds;
4840 
4842  if (poDS == nullptr) {
4843  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4844  file_name);
4845  }
4846 
4847  OGRLayer& layer =
4849 
4850  layer.ResetReading();
4851  // TODO(andrewseidl): support multiple features
4852  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4853  if (poFeature == nullptr) {
4854  throw std::runtime_error("No features found in " + file_name);
4855  }
4856  // get fields as regular columns
4857  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4858  CHECK(poFDefn);
4859  int iField;
4860  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4861  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4862  auto typePair = ogr_to_type(poFieldDefn->GetType());
4863  ColumnDescriptor cd;
4864  cd.columnName = poFieldDefn->GetNameRef();
4865  cd.sourceName = poFieldDefn->GetNameRef();
4866  SQLTypeInfo ti;
4867  if (typePair.second) {
4868  ti.set_type(kARRAY);
4869  ti.set_subtype(typePair.first);
4870  } else {
4871  ti.set_type(typePair.first);
4872  }
4873  if (typePair.first == kTEXT) {
4875  ti.set_comp_param(32);
4876  }
4877  ti.set_fixed_size();
4878  cd.columnType = ti;
4879  cds.push_back(cd);
4880  }
4881  // get geo column, if any
4882  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4883  if (poGeometry) {
4884  ColumnDescriptor cd;
4885  cd.columnName = geo_column_name;
4886  cd.sourceName = geo_column_name;
4887 
4888  // get GDAL type
4889  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4890 
4891  // if exploding, override any collection type to child type
4893  if (ogr_type == wkbMultiPolygon) {
4894  ogr_type = wkbPolygon;
4895  } else if (ogr_type == wkbMultiLineString) {
4896  ogr_type = wkbLineString;
4897  } else if (ogr_type == wkbMultiPoint) {
4898  ogr_type = wkbPoint;
4899  }
4900  }
4901 
4902  // convert to internal type
4903  SQLTypes geoType = ogr_to_type(ogr_type);
4904 
4905  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4907  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4908  }
4909 
4910  // build full internal type
4911  SQLTypeInfo ti;
4912  ti.set_type(geoType);
4918  cd.columnType = ti;
4919 
4920  cds.push_back(cd);
4921  }
4922  return cds;
4923 }
void set_compression(EncodingType c)
Definition: sqltypes.h:429
SQLTypes
Definition: sqltypes.h:38
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4678
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:420
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:114
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:123
std::string sourceName
void set_input_srid(int d)
Definition: sqltypes.h:423
void set_fixed_size()
Definition: sqltypes.h:428
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:165
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:425
void set_comp_param(int p)
Definition: sqltypes.h:430
std::string geo_layer_name
Definition: CopyParams.h:82
Definition: sqltypes.h:52
#define CHECK(condition)
Definition: Logger.h:209
SQLTypeInfo columnType
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4645
std::string columnName
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4780
EncodingType geo_coords_encoding
Definition: CopyParams.h:77
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:419

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 781 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

781  {
782  return loader->get_column_descs();
783  }
std::unique_ptr< Loader > loader
Definition: Importer.h:862

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 780 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

780 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 790 of file Importer.h.

References i, and import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

790  {
791  return import_buffers_vec[i];
792  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 787 of file Importer.h.

References import_buffers_vec.

787  {
788  return import_buffers_vec;
789  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 229 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

229  {
230  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
231  return import_status_map.at(import_id);
232 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:168
std::string import_id
Definition: Importer.h:857
mapd_shared_lock< mapd_shared_mutex > read_lock
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:167

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 793 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

793 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:863

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 826 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

826 { return loader->getCatalog(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:862

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 848 of file Importer.h.

References loader.

848 { return loader.get(); }
std::unique_ptr< Loader > loader
Definition: Importer.h:862
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4331 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4331  {
4332  return DataStreamSink::archivePlumber(session_info);
4333 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3616

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4335 of file Importer.cpp.

References threading_serial::async(), import_export::CopyParams::buffer_size, cat(), CHECK, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::DataStreamSink::copy_params, logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), omnisci::fopen(), g_max_import_threads, import_export::CopyParams::geo_assign_render_groups, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), i, import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, import_export::CopyParams::max_reject, max_threads, import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::CopyParams::threads, import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4338  {
4340  auto query_session = session_info ? session_info->get_session_id() : "";
4341 
4342  if (!p_file) {
4343  p_file = fopen(file_path.c_str(), "rb");
4344  }
4345  if (!p_file) {
4346  throw std::runtime_error("failed to open file '" + file_path +
4347  "': " + strerror(errno));
4348  }
4349 
4350  if (!decompressed) {
4351  (void)fseek(p_file, 0, SEEK_END);
4352  file_size = ftell(p_file);
4353  }
4354 
4355  if (copy_params.threads == 0) {
4356  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
4358  } else {
4359  max_threads = static_cast<size_t>(copy_params.threads);
4360  }
4361  VLOG(1) << "Delimited import # threads: " << max_threads;
4362 
4363  // deal with small files
4364  size_t alloc_size = copy_params.buffer_size;
4365  if (!decompressed && file_size < alloc_size) {
4366  alloc_size = file_size;
4367  }
4368 
4369  for (size_t i = 0; i < max_threads; i++) {
4370  import_buffers_vec.emplace_back();
4371  for (const auto cd : loader->get_column_descs()) {
4372  import_buffers_vec[i].emplace_back(
4373  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4374  }
4375  }
4376 
4377  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4378  size_t current_pos = 0;
4379  size_t end_pos;
4380  size_t begin_pos = 0;
4381 
4382  (void)fseek(p_file, current_pos, SEEK_SET);
4383  size_t size =
4384  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4385 
4386  // make render group analyzers for each poly column
4387  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4389  auto& cat = loader->getCatalog();
4390  auto* td = loader->getTableDesc();
4391  CHECK(td);
4392  auto column_descriptors =
4393  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4394  for (auto const& cd : column_descriptors) {
4395  if (IS_GEO_POLY(cd->columnType.get_type())) {
4396  auto rga = std::make_shared<RenderGroupAnalyzer>();
4397  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4398  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4399  }
4400  }
4401  }
4402 
4403  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4404  loader->getTableDesc()->tableId};
4405  auto table_epochs = loader->getTableEpochs();
4407  {
4408  std::list<std::future<ImportStatus>> threads;
4409 
4410  // use a stack to track thread_ids which must not overlap among threads
4411  // because thread_id is used to index import_buffers_vec[]
4412  std::stack<size_t> stack_thread_ids;
4413  for (size_t i = 0; i < max_threads; i++) {
4414  stack_thread_ids.push(i);
4415  }
4416  // added for true row index on error
4417  size_t first_row_index_this_buffer = 0;
4418 
4419  while (size > 0) {
4420  unsigned int num_rows_this_buffer = 0;
4421  CHECK(scratch_buffer);
4422  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4423  scratch_buffer,
4424  size,
4425  copy_params,
4426  first_row_index_this_buffer,
4427  num_rows_this_buffer,
4428  p_file);
4429 
4430  // unput residual
4431  int nresidual = size - end_pos;
4432  std::unique_ptr<char[]> unbuf;
4433  if (nresidual > 0) {
4434  unbuf = std::make_unique<char[]>(nresidual);
4435  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4436  }
4437 
4438  // get a thread_id not in use
4439  auto thread_id = stack_thread_ids.top();
4440  stack_thread_ids.pop();
4441  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4442 
4443  threads.push_back(std::async(std::launch::async,
4445  thread_id,
4446  this,
4447  std::move(scratch_buffer),
4448  begin_pos,
4449  end_pos,
4450  end_pos,
4451  columnIdToRenderGroupAnalyzerMap,
4452  first_row_index_this_buffer,
4453  session_info,
4454  executor));
4455 
4456  first_row_index_this_buffer += num_rows_this_buffer;
4457 
4458  current_pos += end_pos;
4459  scratch_buffer = std::make_unique<char[]>(alloc_size);
4460  CHECK(scratch_buffer);
4461  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4462  size = nresidual +
4463  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4464 
4465  begin_pos = 0;
4466  while (threads.size() > 0) {
4467  int nready = 0;
4468  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4469  it != threads.end();) {
4470  auto& p = *it;
4471  std::chrono::milliseconds span(0);
4472  if (p.wait_for(span) == std::future_status::ready) {
4473  auto ret_import_status = p.get();
4474  {
4475  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4476  import_status_ += ret_import_status;
4477  if (ret_import_status.load_failed) {
4479  }
4480  }
4481  // sum up current total file offsets
4482  size_t total_file_offset{0};
4483  if (decompressed) {
4484  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4485  for (const auto file_offset : file_offsets) {
4486  total_file_offset += file_offset;
4487  }
4488  }
4489  // estimate number of rows per current total file offset
4490  if (decompressed ? total_file_offset : current_pos) {
4492  (decompressed ? (float)total_file_size / total_file_offset
4493  : (float)file_size / current_pos) *
4494  import_status_.rows_completed;
4495  }
4496  VLOG(3) << "rows_completed " << import_status_.rows_completed
4497  << ", rows_estimated " << import_status_.rows_estimated
4498  << ", total_file_size " << total_file_size << ", total_file_offset "
4499  << total_file_offset;
4501  // recall thread_id for reuse
4502  stack_thread_ids.push(ret_import_status.thread_id);
4503  threads.erase(it++);
4504  ++nready;
4505  } else {
4506  ++it;
4507  }
4508  }
4509 
4510  if (nready == 0) {
4511  std::this_thread::yield();
4512  }
4513 
4514  // on eof, wait all threads to finish
4515  if (0 == size) {
4516  continue;
4517  }
4518 
4519  // keep reading if any free thread slot
4520  // this is one of the major difference from old threading model !!
4521  if (threads.size() < max_threads) {
4522  break;
4523  }
4524  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4526  break;
4527  }
4528  }
4529  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
4531  import_status_.load_failed = true;
4532  // todo use better message
4533  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4534  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4535  break;
4536  }
4538  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4539  break;
4540  }
4541  }
4542 
4543  // join dangling threads in case of LOG(ERROR) above
4544  for (auto& p : threads) {
4545  p.wait();
4546  }
4547  }
4548 
4549  checkpoint(table_epochs);
4550 
4551  fclose(p_file);
4552  p_file = nullptr;
4553  return import_status_;
4554 }
std::vector< int > ChunkKey
Definition: types.h:37
std::string cat(Ts &&...args)
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:1997
::FILE * fopen(const char *filename, const char *mode)
Definition: omnisci_fs.cpp:72
#define LOG(tag)
Definition: Logger.h:203
mapd_shared_mutex import_mutex_
Definition: Importer.h:706
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:176
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861
std::string get_session_id() const
Definition: SessionInfo.h:78
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:234
std::string import_id
Definition: Importer.h:857
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3580
ThreadId thread_id()
Definition: Logger.cpp:791
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:156
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
mapd_unique_lock< mapd_shared_mutex > write_lock
std::vector< size_t > file_offsets
Definition: Importer.h:708
size_t g_max_import_threads
Definition: Importer.cpp:85
#define VLOG(n)
Definition: Logger.h:303
std::unique_ptr< Loader > loader
Definition: Importer.h:862
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:384
const std::string file_path
Definition: Importer.h:703
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( std::map< std::string, std::string >  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 5127 of file Importer.cpp.

References threading_serial::async(), cat(), CHECK, CHECK_EQ, checkpoint(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::DataStreamSink::copy_params, logger::ERROR, g_enable_non_kernel_time_query_interrupt, g_max_import_threads, import_export::CopyParams::geo_assign_render_groups, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_layer_name, Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_type(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), i, import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), IS_GEO_POLY, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, import_export::CopyParams::max_reject, max_threads, openGDALDataset(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::CopyParams::threads, toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by QueryRunner::ImportDriver::importGeoTable().

5128  {
5129  // initial status
5132  if (poDS == nullptr) {
5133  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
5134  file_path);
5135  }
5136 
5137  OGRLayer& layer =
5139 
5140  // get the number of features in this layer
5141  size_t numFeatures = layer.GetFeatureCount();
5142 
5143  // build map of metadata field (additional columns) name to index
5144  // use shared_ptr since we need to pass it to the worker
5145  FieldNameToIndexMapType fieldNameToIndexMap;
5146  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5147  CHECK(poFDefn);
5148  size_t numFields = poFDefn->GetFieldCount();
5149  for (size_t iField = 0; iField < numFields; iField++) {
5150  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5151  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5152  }
5153 
5154  // the geographic spatial reference we want to put everything in
5155  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5156  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5157 
5158 #if GDAL_VERSION_MAJOR >= 3
5159  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5160  // this results in X and Y being transposed for angle-based
5161  // coordinate systems. This restores the previous behavior.
5162  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5163 #endif
5164 
5165 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5166  // just one "thread"
5167  max_threads = 1;
5168 #else
5169  // how many threads to use
5170  if (copy_params.threads == 0) {
5171  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
5173  } else {
5175  }
5176 #endif
5177 
5178  VLOG(1) << "GDAL import # threads: " << max_threads;
5179 
5180  // import geo table is specifically handled in both DBHandler and QueryRunner
5181  // that is separate path against a normal SQL execution
5182  // so we here explicitly enroll the import session to allow interruption
5183  // while importing geo table
5184  auto query_session = session_info ? session_info->get_session_id() : "";
5185  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5187  auto is_session_already_registered = false;
5188  {
5189  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
5190  is_session_already_registered =
5191  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5192  }
5193  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5194  !is_session_already_registered) {
5195  executor->enrollQuerySession(query_session,
5196  "IMPORT_GEO_TABLE",
5197  query_submitted_time,
5199  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5200  }
5201  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5202  // reset the runtime query interrupt status
5203  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5204  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5205  }
5206  };
5207 
5208  // make an import buffer for each thread
5209  CHECK_EQ(import_buffers_vec.size(), 0u);
5211  for (size_t i = 0; i < max_threads; i++) {
5212  for (const auto cd : loader->get_column_descs()) {
5213  import_buffers_vec[i].emplace_back(
5214  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5215  }
5216  }
5217 
5218  // make render group analyzers for each poly column
5219  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5221  auto& cat = loader->getCatalog();
5222  auto* td = loader->getTableDesc();
5223  CHECK(td);
5224  auto column_descriptors =
5225  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5226  for (auto const& cd : column_descriptors) {
5227  if (IS_GEO_POLY(cd->columnType.get_type())) {
5228  auto rga = std::make_shared<RenderGroupAnalyzer>();
5229  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5230  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5231  }
5232  }
5233  }
5234 
5235 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5236  // threads
5237  std::list<std::future<ImportStatus>> threads;
5238 
5239  // use a stack to track thread_ids which must not overlap among threads
5240  // because thread_id is used to index import_buffers_vec[]
5241  std::stack<size_t> stack_thread_ids;
5242  for (size_t i = 0; i < max_threads; i++) {
5243  stack_thread_ids.push(i);
5244  }
5245 #endif
5246 
5247  // checkpoint the table
5248  auto table_epochs = loader->getTableEpochs();
5249 
5250  // reset the layer
5251  layer.ResetReading();
5252 
5253  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5254 
5255  // make a features buffer for each thread
5256  std::vector<FeaturePtrVector> features(max_threads);
5257 
5258  // for each feature...
5259  size_t firstFeatureThisChunk = 0;
5260  while (firstFeatureThisChunk < numFeatures) {
5261  // how many features this chunk
5262  size_t numFeaturesThisChunk =
5263  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5264 
5265 // get a thread_id not in use
5266 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5267  size_t thread_id = 0;
5268 #else
5269  auto thread_id = stack_thread_ids.top();
5270  stack_thread_ids.pop();
5271  CHECK(thread_id < max_threads);
5272 #endif
5273 
5274  // fill features buffer for new thread
5275  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5276  features[thread_id].emplace_back(layer.GetNextFeature());
5277  }
5278 
5279 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5280  // call worker function directly
5281  auto ret_import_status = import_thread_shapefile(0,
5282  this,
5283  poGeographicSR.get(),
5284  std::move(features[thread_id]),
5285  firstFeatureThisChunk,
5286  numFeaturesThisChunk,
5287  fieldNameToIndexMap,
5288  columnNameToSourceNameMap,
5289  columnIdToRenderGroupAnalyzerMap,
5290  session_info,
5291  executor.get());
5292  import_status += ret_import_status;
5293  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5294  import_status.rows_completed;
5295  set_import_status(import_id, import_status);
5296 #else
5297  // fire up that thread to import this geometry
5298  threads.push_back(std::async(std::launch::async,
5300  thread_id,
5301  this,
5302  poGeographicSR.get(),
5303  std::move(features[thread_id]),
5304  firstFeatureThisChunk,
5305  numFeaturesThisChunk,
5306  fieldNameToIndexMap,
5307  columnNameToSourceNameMap,
5308  columnIdToRenderGroupAnalyzerMap,
5309  session_info,
5310  executor.get()));
5311 
5312  // let the threads run
5313  while (threads.size() > 0) {
5314  int nready = 0;
5315  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5316  it != threads.end();) {
5317  auto& p = *it;
5318  std::chrono::milliseconds span(
5319  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5320  if (p.wait_for(span) == std::future_status::ready) {
5321  auto ret_import_status = p.get();
5322  {
5323  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
5324  import_status_ += ret_import_status;
5326  ((float)firstFeatureThisChunk / (float)numFeatures) *
5330  break;
5331  }
5332  }
5333  // recall thread_id for reuse
5334  stack_thread_ids.push(ret_import_status.thread_id);
5335 
5336  threads.erase(it++);
5337  ++nready;
5338  } else {
5339  ++it;
5340  }
5341  }
5342 
5343  if (nready == 0) {
5344  std::this_thread::yield();
5345  }
5346 
5347  // keep reading if any free thread slot
5348  // this is one of the major difference from old threading model !!
5349  if (threads.size() < max_threads) {
5350  break;
5351  }
5352  }
5353 #endif
5354 
5355  // out of rows?
5356 
5357  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
5359  import_status_.load_failed = true;
5360  // todo use better message
5361  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5362  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5363  break;
5364  }
5366  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5367  "more details";
5368  break;
5369  }
5370 
5371  firstFeatureThisChunk += numFeaturesThisChunk;
5372  }
5373 
5374 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5375  // wait for any remaining threads
5376  if (threads.size()) {
5377  for (auto& p : threads) {
5378  // wait for the thread
5379  p.wait();
5380  // get the result and update the final import status
5381  auto ret_import_status = p.get();
5382  import_status_ += ret_import_status;
5385  }
5386  }
5387 #endif
5388 
5389  checkpoint(table_epochs);
5390 
5391  return import_status_;
5392 }
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:153
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::string cat(Ts &&...args)
std::string toString(const ExtArgumentType &sig_type)
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:133
#define LOG(tag)
Definition: Logger.h:203
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4678
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:114
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
mapd_shared_mutex import_mutex_
Definition: Importer.h:706
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:176
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:861
std::string get_session_id() const
Definition: SessionInfo.h:78
std::string geo_layer_name
Definition: CopyParams.h:82
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:234
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:2369
std::string import_id
Definition: Importer.h:857
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3580
ThreadId thread_id()
Definition: Logger.cpp:791
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, std::shared_ptr< RenderGroupAnalyzer >> ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:156
mapd_unique_lock< mapd_shared_mutex > write_lock
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4645
size_t g_max_import_threads
Definition: Importer.cpp:85
#define VLOG(n)
Definition: Logger.h:303
std::unique_ptr< Loader > loader
Definition: Importer.h:862
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:384
const std::string file_path
Definition: Importer.h:703
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3570 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

3572  {
3573  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3574  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3575  import_status_.load_failed = true;
3576  import_status_.load_msg = loader->getErrorMessage();
3577  }
3578 }
mapd_shared_mutex import_mutex_
Definition: Importer.h:706
mapd_unique_lock< mapd_shared_mutex > write_lock
std::unique_ptr< Loader > loader
Definition: Importer.h:862

+ Here is the caller graph for this function:

OGRDataSource * import_export::Importer::openGDALDataset ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4645 of file Importer.cpp.

References logger::ERROR, logger::INFO, Geospatial::GDAL::init(), LOG, and setGDALAuthorizationTokens().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptors(), importGDAL(), and readMetadataSampleGDAL().

4646  {
4647  // lazy init GDAL
4649 
4650  // set authorization tokens
4652 
4653  // open the file
4654  OGRDataSource* poDS;
4655 #if GDAL_VERSION_MAJOR == 1
4656  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4657 #else
4658  poDS = (OGRDataSource*)GDALOpenEx(
4659  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4660  if (poDS == nullptr) {
4661  poDS = (OGRDataSource*)GDALOpenEx(
4662  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4663  if (poDS) {
4664  LOG(INFO) << "openGDALDataset had to open as read-only";
4665  }
4666  }
4667 #endif
4668  if (poDS == nullptr) {
4669  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4670  }
4671  // NOTE(adb): If extending this function, refactor to ensure any errors will not
4672  // result in a memory leak if GDAL successfully opened the input dataset.
4673  return poDS;
4674 }
#define LOG(tag)
Definition: Logger.h:203
static void init()
Definition: GDAL.cpp:59
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4575

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4701 of file Importer.cpp.

References CHECK, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), i, and openGDALDataset().

Referenced by DBHandler::detect_column_types().

4706  {
4708  if (poDS == nullptr) {
4709  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4710  file_name);
4711  }
4712 
4713  OGRLayer& layer =
4715 
4716  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4717  CHECK(poFDefn);
4718 
4719  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4720  auto nFeats = layer.GetFeatureCount();
4721  size_t numFeatures =
4722  std::max(static_cast<decltype(nFeats)>(0),
4723  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4724  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4725  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4726  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4727  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4728  }
4729  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4730  layer.ResetReading();
4731  size_t iFeature = 0;
4732  while (iFeature < numFeatures) {
4733  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4734  if (!poFeature) {
4735  break;
4736  }
4737 
4738  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4739  if (poGeometry != nullptr) {
4740  // validate geom type (again?)
4741  switch (wkbFlatten(poGeometry->getGeometryType())) {
4742  case wkbPoint:
4743  case wkbLineString:
4744  case wkbPolygon:
4745  case wkbMultiPolygon:
4746  break;
4747  case wkbMultiPoint:
4748  case wkbMultiLineString:
4749  // supported if geo_explode_collections is specified
4751  throw std::runtime_error("Unsupported geometry type: " +
4752  std::string(poGeometry->getGeometryName()));
4753  }
4754  break;
4755  default:
4756  throw std::runtime_error("Unsupported geometry type: " +
4757  std::string(poGeometry->getGeometryName()));
4758  }
4759 
4760  // populate metadata for regular fields
4761  for (auto i : metadata) {
4762  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4763  if (iField >= 0) { // geom is -1
4764  metadata[i.first].at(iFeature) =
4765  std::string(poFeature->GetFieldAsString(iField));
4766  }
4767  }
4768 
4769  // populate metadata for geo column with WKT string
4770  char* wkts = nullptr;
4771  poGeometry->exportToWkt(&wkts);
4772  CHECK(wkts);
4773  metadata[geo_column_name].at(iFeature) = wkts;
4774  CPLFree(wkts);
4775  }
4776  iFeature++;
4777  }
4778 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4678
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:114
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:123
std::string geo_layer_name
Definition: CopyParams.h:82
#define CHECK(condition)
Definition: Logger.h:209
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4645

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
int  render_group 
)
static

Definition at line 1630 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), and foreign_storage::TextFileBufferParser::processGeoColumn().

1639  {
1640  const auto col_ti = cd->columnType;
1641  const auto col_type = col_ti.get_type();
1642  auto columnId = cd->columnId;
1643  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1644  bool is_null_geo = false;
1645  bool is_null_point = false;
1646  if (!col_ti.get_notnull()) {
1647  // Check for NULL geo
1648  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1649  is_null_point = true;
1650  coords.clear();
1651  }
1652  is_null_geo = coords.empty();
1653  if (is_null_point) {
1654  coords.push_back(NULL_ARRAY_DOUBLE);
1655  coords.push_back(NULL_DOUBLE);
1656  // Treating POINT coords as notnull, need to store actual encoding
1657  // [un]compressed+[not]null
1658  is_null_geo = false;
1659  }
1660  }
1661  TDatum tdd_coords;
1662  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1663  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1664  // in a fixlen array, compressed and uncompressed.
1665  if (!is_null_geo) {
1666  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1667  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1668  for (auto cc : compressed_coords) {
1669  tdd_coords.val.arr_val.emplace_back();
1670  tdd_coords.val.arr_val.back().val.int_val = cc;
1671  }
1672  }
1673  tdd_coords.is_null = is_null_geo;
1674  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1675 
1676  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1677  // Create ring_sizes array value and add it to the physical column
1678  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1679  TDatum tdd_ring_sizes;
1680  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1681  if (!is_null_geo) {
1682  for (auto ring_size : ring_sizes) {
1683  tdd_ring_sizes.val.arr_val.emplace_back();
1684  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1685  }
1686  }
1687  tdd_ring_sizes.is_null = is_null_geo;
1688  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1689  }
1690 
1691  if (col_type == kMULTIPOLYGON) {
1692  // Create poly_rings array value and add it to the physical column
1693  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1694  TDatum tdd_poly_rings;
1695  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1696  if (!is_null_geo) {
1697  for (auto num_rings : poly_rings) {
1698  tdd_poly_rings.val.arr_val.emplace_back();
1699  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1700  }
1701  }
1702  tdd_poly_rings.is_null = is_null_geo;
1703  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1704  }
1705 
1706  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1707  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1708  TDatum tdd_bounds;
1709  tdd_bounds.val.arr_val.reserve(bounds.size());
1710  if (!is_null_geo) {
1711  for (auto b : bounds) {
1712  tdd_bounds.val.arr_val.emplace_back();
1713  tdd_bounds.val.arr_val.back().val.real_val = b;
1714  }
1715  }
1716  tdd_bounds.is_null = is_null_geo;
1717  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1718  }
1719 
1720  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1721  // Create render_group value and add it to the physical column
1722  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1723  TDatum td_render_group;
1724  td_render_group.val.int_val = render_group;
1725  td_render_group.is_null = is_null_geo;
1726  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1727  }
1728 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column,
std::vector< int > &  render_groups_column 
)
static

Definition at line 1730 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1739  {
1740  const auto col_ti = cd->columnType;
1741  const auto col_type = col_ti.get_type();
1742  auto columnId = cd->columnId;
1743 
1744  auto coords_row_count = coords_column.size();
1745  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto& coords : coords_column) {
1747  bool is_null_geo = false;
1748  bool is_null_point = false;
1749  if (!col_ti.get_notnull()) {
1750  // Check for NULL geo
1751  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1752  is_null_point = true;
1753  coords.clear();
1754  }
1755  is_null_geo = coords.empty();
1756  if (is_null_point) {
1757  coords.push_back(NULL_ARRAY_DOUBLE);
1758  coords.push_back(NULL_DOUBLE);
1759  // Treating POINT coords as notnull, need to store actual encoding
1760  // [un]compressed+[not]null
1761  is_null_geo = false;
1762  }
1763  }
1764  std::vector<TDatum> td_coords_data;
1765  if (!is_null_geo) {
1766  std::vector<uint8_t> compressed_coords =
1767  Geospatial::compress_coords(coords, col_ti);
1768  for (auto const& cc : compressed_coords) {
1769  TDatum td_byte;
1770  td_byte.val.int_val = cc;
1771  td_coords_data.push_back(td_byte);
1772  }
1773  }
1774  TDatum tdd_coords;
1775  tdd_coords.val.arr_val = td_coords_data;
1776  tdd_coords.is_null = is_null_geo;
1777  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1778  }
1779  col_idx++;
1780 
1781  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1782  if (ring_sizes_column.size() != coords_row_count) {
1783  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1784  }
1785  // Create ring_sizes array value and add it to the physical column
1786  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1787  for (auto const& ring_sizes : ring_sizes_column) {
1788  bool is_null_geo = false;
1789  if (!col_ti.get_notnull()) {
1790  // Check for NULL geo
1791  is_null_geo = ring_sizes.empty();
1792  }
1793  std::vector<TDatum> td_ring_sizes;
1794  for (auto const& ring_size : ring_sizes) {
1795  TDatum td_ring_size;
1796  td_ring_size.val.int_val = ring_size;
1797  td_ring_sizes.push_back(td_ring_size);
1798  }
1799  TDatum tdd_ring_sizes;
1800  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801  tdd_ring_sizes.is_null = is_null_geo;
1802  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1803  }
1804  col_idx++;
1805  }
1806 
1807  if (col_type == kMULTIPOLYGON) {
1808  if (poly_rings_column.size() != coords_row_count) {
1809  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1810  }
1811  // Create poly_rings array value and add it to the physical column
1812  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1813  for (auto const& poly_rings : poly_rings_column) {
1814  bool is_null_geo = false;
1815  if (!col_ti.get_notnull()) {
1816  // Check for NULL geo
1817  is_null_geo = poly_rings.empty();
1818  }
1819  std::vector<TDatum> td_poly_rings;
1820  for (auto const& num_rings : poly_rings) {
1821  TDatum td_num_rings;
1822  td_num_rings.val.int_val = num_rings;
1823  td_poly_rings.push_back(td_num_rings);
1824  }
1825  TDatum tdd_poly_rings;
1826  tdd_poly_rings.val.arr_val = td_poly_rings;
1827  tdd_poly_rings.is_null = is_null_geo;
1828  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1829  }
1830  col_idx++;
1831  }
1832 
1833  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1834  if (bounds_column.size() != coords_row_count) {
1835  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1836  }
1837  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1838  for (auto const& bounds : bounds_column) {
1839  bool is_null_geo = false;
1840  if (!col_ti.get_notnull()) {
1841  // Check for NULL geo
1842  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1843  }
1844  std::vector<TDatum> td_bounds_data;
1845  for (auto const& b : bounds) {
1846  TDatum td_double;
1847  td_double.val.real_val = b;
1848  td_bounds_data.push_back(td_double);
1849  }
1850  TDatum tdd_bounds;
1851  tdd_bounds.val.arr_val = td_bounds_data;
1852  tdd_bounds.is_null = is_null_geo;
1853  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1854  }
1855  col_idx++;
1856  }
1857 
1858  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1859  // Create render_group value and add it to the physical column
1860  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1861  for (auto const& render_group : render_groups_column) {
1862  TDatum td_render_group;
1863  td_render_group.val.int_val = render_group;
1864  td_render_group.is_null = false;
1865  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1866  }
1867  col_idx++;
1868  }
1869 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:209
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 234 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), and importGDAL().

234  {
235  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
236  is.end = std::chrono::steady_clock::now();
237  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
239 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:168
std::string import_id
Definition: Importer.h:857
mapd_unique_lock< mapd_shared_mutex > write_lock
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:167

+ Here is the caller graph for this function:

void import_export::Importer::setGDALAuthorizationTokens ( const CopyParams copy_params)
staticprivate

Definition at line 4575 of file Importer.cpp.

References logger::INFO, LOG, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, and import_export::CopyParams::s3_secret_key.

Referenced by gdalGetAllFilesInArchive(), gdalGetLayersInGeoFile(), gdalStatInternal(), and openGDALDataset().

4575  {
4576  // for now we only support S3
4577  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4578  // only set if non-empty to allow GDAL defaults to persist
4579  // explicitly clear if empty to revert to default and not reuse a previous session's
4580  // keys
4581  if (copy_params.s3_region.size()) {
4582 #if DEBUG_AWS_AUTHENTICATION
4583  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4584 #endif
4585  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4586  } else {
4587 #if DEBUG_AWS_AUTHENTICATION
4588  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4589 #endif
4590  CPLSetConfigOption("AWS_REGION", nullptr);
4591  }
4592  if (copy_params.s3_endpoint.size()) {
4593 #if DEBUG_AWS_AUTHENTICATION
4594  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4595 #endif
4596  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4597  } else {
4598 #if DEBUG_AWS_AUTHENTICATION
4599  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4600 #endif
4601  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4602  }
4603  if (copy_params.s3_access_key.size()) {
4604 #if DEBUG_AWS_AUTHENTICATION
4605  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4606  << "'";
4607 #endif
4608  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4609  } else {
4610 #if DEBUG_AWS_AUTHENTICATION
4611  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4612 #endif
4613  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4614  }
4615  if (copy_params.s3_secret_key.size()) {
4616 #if DEBUG_AWS_AUTHENTICATION
4617  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4618  << "'";
4619 #endif
4620  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4621  } else {
4622 #if DEBUG_AWS_AUTHENTICATION
4623  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4624 #endif
4625  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4626  }
4627 
4628 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4629  // if we haven't set keys, we need to disable signed access
4630  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4631 #if DEBUG_AWS_AUTHENTICATION
4632  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4633 #endif
4634  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4635  } else {
4636 #if DEBUG_AWS_AUTHENTICATION
4637  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4638 #endif
4639  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4640  }
4641 #endif
4642 }
std::string s3_secret_key
Definition: CopyParams.h:66
#define LOG(tag)
Definition: Logger.h:203
std::string s3_access_key
Definition: CopyParams.h:65

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 860 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 858 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 864 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 863 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 859 of file Importer.h.

Referenced by importDelimited(), Importer(), and importGDAL().


The documentation for this class was generated from the following files: