OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::ForeignDataImporter Class Reference

#include <ForeignDataImporter.h>

+ Inheritance diagram for import_export::ForeignDataImporter:
+ Collaboration diagram for import_export::ForeignDataImporter:

Public Member Functions

 ForeignDataImporter (const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static void setDefaultImportPath (const std::string &base_path)
 

Static Public Attributes

static int32_t proxy_foreign_table_fragment_size_ = 2000000
 

Protected Attributes

std::unique_ptr
< Fragmenter_Namespace::InsertDataLoader::InsertConnector
connector_
 

Private Member Functions

void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
 
void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const int32_t table_id)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info, const std::string &copy_from_source, const CopyParams &copy_params)
 
ImportStatus importGeneralS3 (const Catalog_Namespace::SessionInfo *session_info)
 

Private Attributes

std::string copy_from_source_
 
CopyParams copy_params_
 
const TableDescriptortable_
 

Static Private Attributes

static std::string default_import_path_
 

Detailed Description

Definition at line 26 of file ForeignDataImporter.h.

Constructor & Destructor Documentation

import_export::ForeignDataImporter::ForeignDataImporter ( const std::string &  file_path,
const CopyParams copy_params,
const TableDescriptor table 
)

Definition at line 284 of file ForeignDataImporter.cpp.

References connector_.

287  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
288  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
289 }
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_

Member Function Documentation

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &  string_dictionaries 
)
private

Definition at line 291 of file ForeignDataImporter.cpp.

References connector_, DEBUG_TIMER, Data_Namespace::DISK_LEVEL, logger::ERROR, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, LOG, TableDescriptor::persistenceLevel, table_, and TableDescriptor::tableId.

Referenced by finalize(), and importGeneral().

295  {
296  if (table_->persistenceLevel ==
297  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
298  // tables
299  if (!import_status.load_failed) {
300  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
301  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
302  if (!string_dictionary->checkpoint()) {
303  LOG(ERROR) << "Checkpointing Dictionary for Column "
304  << column_desciptor->columnName << " failed.";
305  import_status.load_failed = true;
306  import_status.load_msg = "Dictionary checkpoint failed";
307  break;
308  }
309  }
310  }
311  }
312  if (import_status.load_failed) {
313  connector_->rollback(parent_session_info, table_->tableId);
314  } else {
315  connector_->checkpoint(parent_session_info, table_->tableId);
316  }
317 }
#define LOG(tag)
Definition: Logger.h:216
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
Data_Namespace::MemoryLevel persistenceLevel
#define DEBUG_TIMER(name)
Definition: Logger.h:371

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const int32_t  table_id 
)
private

Definition at line 319 of file ForeignDataImporter.cpp.

References finalize(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), and Catalog_Namespace::SessionInfo::getCatalog().

322  {
323  auto& catalog = parent_session_info.getCatalog();
324 
325  auto logical_columns =
326  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
327 
328  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
329  for (const auto& column_descriptor : logical_columns) {
330  if (!column_descriptor->columnType.is_dict_encoded_string()) {
331  continue;
332  }
333  auto dict_descriptor =
334  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
335  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
336  }
337 
338  finalize(parent_session_info, import_status, string_dictionaries);
339 }
Catalog & getCatalog() const
Definition: SessionInfo.h:65
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2228

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 743 of file ForeignDataImporter.cpp.

References importGeneral(), importGeneralS3(), and foreign_storage::is_s3_uri().

744  {
746  return importGeneralS3(session_info);
747  }
748  return importGeneral(session_info);
749 }
bool is_s3_uri(const std::string &file_path)
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 399 of file ForeignDataImporter.cpp.

References copy_from_source_, and copy_params_.

Referenced by import(), and importGeneralS3().

400  {
401  return importGeneral(session_info, copy_from_source_, copy_params_);
402 }
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the caller graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info,
const std::string &  copy_from_source,
const CopyParams copy_params 
)
private

Definition at line 346 of file ForeignDataImporter.cpp.

References CHECK, connector_, foreign_storage::create_proxy_fsi_objects(), foreign_storage::ForeignDataWrapperFactory::createForGeneralImport(), finalize(), Catalog_Namespace::SessionInfo::get_currentUser(), anonymous_namespace{ForeignDataImporter.cpp}::get_data_wrapper_type(), Catalog_Namespace::SessionInfo::getCatalog(), anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data(), foreign_storage::is_valid_source_type(), anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan(), proxy_foreign_table_fragment_size_, table_, TableDescriptor::tableId, TableDescriptor::userId, and anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params().

349  {
350  auto& catalog = session_info->getCatalog();
351 
353 
354  // validate copy params before import in order to print user friendly messages
355  validate_copy_params(copy_params);
356 
357  ImportStatus import_status;
358  {
359  auto& current_user = session_info->get_currentUser();
360  auto [server, user_mapping, foreign_table] =
362  copy_params,
363  catalog.getDatabaseId(),
364  table_,
365  current_user.userId);
366 
367  // set fragment size for proxy foreign table during import
368  foreign_table->maxFragRows = proxy_foreign_table_fragment_size_;
369 
370  auto data_wrapper =
372  get_data_wrapper_type(copy_params),
373  catalog.getDatabaseId(),
374  foreign_table.get(),
375  user_mapping.get());
376 
377  ChunkMetadataVector metadata_vector =
378  metadata_scan(data_wrapper.get(), foreign_table.get());
379  if (metadata_vector.empty()) { // an empty data source
380  return {};
381  }
382 
383  import_status = import_foreign_data(metadata_vector,
384  connector_.get(),
385  catalog,
386  table_,
387  data_wrapper.get(),
388  session_info,
389  copy_params,
390  copy_from_source);
391 
392  } // this scope ensures that fsi proxy objects are destroyed proir to checkpointing
393 
394  finalize(*session_info, import_status, table_->tableId);
395 
396  return import_status;
397 }
import_export::ImportStatus import_foreign_data(const ChunkMetadataVector &metadata_vector, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source)
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
std::string get_data_wrapper_type(const import_export::CopyParams &copy_params)
Catalog & getCatalog() const
Definition: SessionInfo.h:65
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
#define CHECK(condition)
Definition: Logger.h:222
void validate_copy_params(const import_export::CopyParams &copy_params)
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:71
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneralS3 ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 409 of file ForeignDataImporter.cpp.

References threading_serial::async(), CHECK, copy_from_source_, copy_params_, foreign_storage::create_futures_for_workers(), default_import_path_, import_export::CopyParams::file_sort_order_by, import_export::CopyParams::file_sort_regex, importGeneral(), gpu_enabled::iota(), foreign_storage::is_s3_uri(), import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, import_export::ImportStatus::load_failed, import_export::CopyParams::plain_text, import_export::CopyParams::regex_path_filter, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_max_concurrent_downloads, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, import_export::Importer::set_import_status(), import_export::CopyParams::source_type, to_string(), and shared::validate_sort_options().

Referenced by import().

410  {
412 
414 #if ENABLE_IMPORT_PARQUET
416 #endif
418  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
419  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
420 #if ENABLE_IMPORT_PARQUET
421  ", 'PARQUET_FILE'"
422 #endif
423  " or 'REGEX_PARSED_FILE'");
424  }
425 
430 
431 #ifdef HAVE_AWS_S3
432 
433  auto uuid = boost::uuids::random_generator()();
434  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
435  auto import_path = std::filesystem::path(default_import_path_) / base_path;
436 
437  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
447  import_path);
448  s3_archive->init_for_read();
449 
450  const auto bucket_name = s3_archive->url_part(4);
451 
452  auto object_keys = s3_archive->get_objkeys();
453  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
454  size_t object_count = 0;
455  for (const auto& objkey : object_keys) {
456  auto& object = objects_to_process[object_count++];
457  object.object_key = objkey;
458  object.is_downloaded = false;
459  }
460 
461  // Ensure files & dirs are cleaned up, regardless of outcome
462  ScopeGuard cleanup_guard = [&] {
463  if (std::filesystem::exists(import_path)) {
464  std::filesystem::remove_all(import_path);
465  }
466  };
467 
468  ImportStatus aggregate_import_status;
469  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
470 
471  std::mutex communication_mutex;
472  bool continue_downloading = true;
473  bool download_exception_occured = false;
474 
475  std::condition_variable files_download_condition;
476 
477  auto is_downloading_finished = [&] {
478  std::unique_lock communication_lock(communication_mutex);
479  return !continue_downloading || download_exception_occured;
480  };
481 
482  std::function<void(const std::vector<size_t>&)> download_objects =
483  [&](const std::vector<size_t>& partition) {
484  for (const auto& index : partition) {
485  DownloadedObjectToProcess& object = objects_to_process[index];
486  const std::string& obj_key = object.object_key;
487  if (is_downloading_finished()) {
488  return;
489  }
490  std::exception_ptr eptr; // unused
491  std::string local_file_path;
492  std::string exception_what;
493  bool exception_occured = false;
494 
495  try {
496  local_file_path = s3_archive->land(obj_key,
497  eptr,
498  false,
499  /*allow_named_pipe_use=*/false,
500  /*track_file_path=*/false);
501  } catch (const std::exception& e) {
502  exception_what = e.what();
503  exception_occured = true;
504  }
505 
506  if (is_downloading_finished()) {
507  return;
508  }
509  if (exception_occured) {
510  {
511  std::unique_lock communication_lock(communication_mutex);
512  download_exception_occured = true;
513  }
514  files_download_condition.notify_all();
515  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
516  "': " + exception_what);
517  }
518 
519  object.download_file_path = local_file_path;
520  object.is_downloaded =
521  true; // this variable is atomic and therefore acts as a lock, it must be
522  // set last to ensure no data race
523 
524  files_download_condition.notify_all();
525  }
526  };
527 
528  std::function<void()> import_local_files = [&]() {
529  for (size_t object_index = 0; object_index < object_count;) {
530  {
531  std::unique_lock communication_lock(communication_mutex);
532  files_download_condition.wait(
533  communication_lock,
534  [&download_exception_occured, object_index, &objects_to_process]() {
535  return objects_to_process[object_index].is_downloaded ||
536  download_exception_occured;
537  });
538  if (download_exception_occured) { // do not wait for object index if a download
539  // error has occured
540  return;
541  }
542  }
543 
544  // find largest range of files to import
545  size_t end_object_index = object_count;
546  for (size_t i = object_index + 1; i < object_count; ++i) {
547  if (!objects_to_process[i].is_downloaded) {
548  end_object_index = i;
549  break;
550  }
551  }
552 
553  ImportStatus local_import_status;
554  std::string local_import_dir;
555  try {
556  CopyParams local_copy_params;
557  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
558  copy_params_, objects_to_process, object_index, end_object_index);
559  local_import_status =
560  importGeneral(session_info, local_import_dir, local_copy_params);
561  // clean up temporary files
562  std::filesystem::remove_all(local_import_dir);
563  } catch (const std::exception& except) {
564  // replace all occurences of file names with the object keys for
565  // users
566  std::string what = except.what();
567 
568  for (size_t i = object_index; i < end_object_index; ++i) {
569  auto& object = objects_to_process[i];
570  what = boost::regex_replace(what,
571  boost::regex{object.import_file_path},
572  bucket_name + "/" + object.object_key);
573  }
574  {
575  std::unique_lock communication_lock(communication_mutex);
576  continue_downloading = false;
577  }
578  // clean up temporary files
579  std::filesystem::remove_all(local_import_dir);
580  throw std::runtime_error(what);
581  }
582  aggregate_import_status += local_import_status;
584  aggregate_import_status);
585  if (aggregate_import_status.load_failed) {
586  {
587  std::unique_lock communication_lock(communication_mutex);
588  continue_downloading = false;
589  }
590  return;
591  }
592 
593  object_index =
594  end_object_index; // all objects in range [object_index,end_object_index)
595  // correctly imported at this point in excecution, move onto
596  // next range
597  }
598  };
599 
600  std::vector<size_t> partition_range(object_count);
601  std::iota(partition_range.begin(), partition_range.end(), 0);
602  auto download_futures = foreign_storage::create_futures_for_workers(
603  partition_range, num_download_threads, download_objects);
604 
605  auto import_future = std::async(std::launch::async, import_local_files);
606 
607  for (auto& future : download_futures) {
608  future.wait();
609  }
610  import_future.get(); // may throw an exception
611 
612  // get any remaining exceptions
613  for (auto& future : download_futures) {
614  future.get();
615  }
616  return aggregate_import_status;
617 
618 #else
619  throw std::runtime_error("AWS S3 support not available");
620 
621  return {};
622 #endif
623 }
std::string s3_secret_key
Definition: CopyParams.h:62
void validate_sort_options(const FilePathOptions &options)
std::string to_string(char const *&&v)
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:84
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
import_export::SourceType source_type
Definition: CopyParams.h:57
bool is_s3_uri(const std::string &file_path)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:241
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:222
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:85
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:86

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::setDefaultImportPath ( const std::string &  base_path)
static

Definition at line 404 of file ForeignDataImporter.cpp.

References default_import_path_, and shared::kDefaultImportDirName.

Referenced by CommandLineOptions::validate().

404  {
405  auto data_dir_path = boost::filesystem::canonical(base_path);
406  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
407 }
const std::string kDefaultImportDirName

+ Here is the caller graph for this function:

Member Data Documentation

std::unique_ptr<Fragmenter_Namespace::InsertDataLoader::InsertConnector> import_export::ForeignDataImporter::connector_
protected

Definition at line 43 of file ForeignDataImporter.h.

Referenced by finalize(), ForeignDataImporter(), and importGeneral().

std::string import_export::ForeignDataImporter::copy_from_source_
private

Definition at line 66 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

CopyParams import_export::ForeignDataImporter::copy_params_
private

Definition at line 67 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

std::string import_export::ForeignDataImporter::default_import_path_
inlinestaticprivate

Definition at line 69 of file ForeignDataImporter.h.

Referenced by importGeneralS3(), and setDefaultImportPath().

int32_t import_export::ForeignDataImporter::proxy_foreign_table_fragment_size_ = 2000000
static

Definition at line 40 of file ForeignDataImporter.h.

Referenced by importGeneral().

const TableDescriptor* import_export::ForeignDataImporter::table_
private

Definition at line 68 of file ForeignDataImporter.h.

Referenced by finalize(), and importGeneral().


The documentation for this class was generated from the following files: