OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::ForeignDataImporter Class Reference

#include <ForeignDataImporter.h>

+ Inheritance diagram for import_export::ForeignDataImporter:
+ Collaboration diagram for import_export::ForeignDataImporter:

Public Member Functions

 ForeignDataImporter (const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static void setDefaultImportPath (const std::string &base_path)
 

Static Public Attributes

static int32_t proxy_foreign_table_fragment_size_ = 0
 

Protected Attributes

std::unique_ptr
< Fragmenter_Namespace::InsertDataLoader::InsertConnector
connector_
 

Private Member Functions

void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
 
void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const int32_t table_id)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info, const std::string &copy_from_source, const CopyParams &copy_params)
 
ImportStatus importGeneralS3 (const Catalog_Namespace::SessionInfo *session_info)
 

Private Attributes

std::string copy_from_source_
 
CopyParams copy_params_
 
const TableDescriptortable_
 

Static Private Attributes

static std::string default_import_path_
 

Detailed Description

Definition at line 26 of file ForeignDataImporter.h.

Constructor & Destructor Documentation

import_export::ForeignDataImporter::ForeignDataImporter ( const std::string &  file_path,
const CopyParams copy_params,
const TableDescriptor table 
)

Definition at line 415 of file ForeignDataImporter.cpp.

References connector_.

418  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
419  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
420 }
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_

Member Function Documentation

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &  string_dictionaries 
)
private

Definition at line 422 of file ForeignDataImporter.cpp.

References connector_, DEBUG_TIMER, Data_Namespace::DISK_LEVEL, logger::ERROR, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, LOG, TableDescriptor::persistenceLevel, table_, and TableDescriptor::tableId.

Referenced by finalize(), and importGeneral().

426  {
427  if (table_->persistenceLevel ==
428  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
429  // tables
430  if (!import_status.load_failed) {
431  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
432  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
433  if (!string_dictionary->checkpoint()) {
434  LOG(ERROR) << "Checkpointing Dictionary for Column "
435  << column_desciptor->columnName << " failed.";
436  import_status.load_failed = true;
437  import_status.load_msg = "Dictionary checkpoint failed";
438  break;
439  }
440  }
441  }
442  }
443  if (import_status.load_failed) {
444  connector_->rollback(parent_session_info, table_->tableId);
445  } else {
446  connector_->checkpoint(parent_session_info, table_->tableId);
447  }
448 }
#define LOG(tag)
Definition: Logger.h:285
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
Data_Namespace::MemoryLevel persistenceLevel
#define DEBUG_TIMER(name)
Definition: Logger.h:411

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const int32_t  table_id 
)
private

Definition at line 450 of file ForeignDataImporter.cpp.

References finalize(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), and Catalog_Namespace::SessionInfo::getCatalog().

453  {
454  auto& catalog = parent_session_info.getCatalog();
455 
456  auto logical_columns =
457  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
458 
459  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
460  for (const auto& column_descriptor : logical_columns) {
461  if (!column_descriptor->columnType.is_dict_encoded_string()) {
462  continue;
463  }
464  auto dict_descriptor =
465  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
466  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
467  }
468 
469  finalize(parent_session_info, import_status, string_dictionaries);
470 }
Catalog & getCatalog() const
Definition: SessionInfo.h:75
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2267

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 938 of file ForeignDataImporter.cpp.

References importGeneral(), importGeneralS3(), and shared::is_s3_uri().

939  {
941  return importGeneralS3(session_info);
942  }
943  return importGeneral(session_info);
944 }
bool is_s3_uri(const std::string &file_path)
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 594 of file ForeignDataImporter.cpp.

References copy_from_source_, and copy_params_.

Referenced by import(), and importGeneralS3().

595  {
596  return importGeneral(session_info, copy_from_source_, copy_params_);
597 }
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the caller graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info,
const std::string &  copy_from_source,
const CopyParams copy_params 
)
private

Definition at line 521 of file ForeignDataImporter.cpp.

References CHECK, CHECK_GE, CHUNK_KEY_FRAGMENT_IDX, connector_, foreign_storage::create_proxy_fsi_objects(), foreign_storage::ForeignDataWrapperFactory::createForGeneralImport(), finalize(), Catalog_Namespace::SessionInfo::get_currentUser(), import_export::anonymous_namespace{ForeignDataImporter.cpp}::get_proxy_foreign_table_fragment_size(), Catalog_Namespace::SessionInfo::getCatalog(), anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data(), logger::INFO, foreign_storage::is_valid_source_type(), LOG, import_export::CopyParams::max_import_batch_row_count, anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan(), table_, TableDescriptor::tableId, TableDescriptor::tableName, TableDescriptor::userId, and anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params().

524  {
525  auto& catalog = session_info->getCatalog();
526 
528 
529  // validate copy params before import in order to print user friendly messages
530  validate_copy_params(copy_params);
531 
532  ImportStatus import_status;
533  {
534  auto& current_user = session_info->get_currentUser();
535  auto [server, user_mapping, foreign_table] =
537  copy_params,
538  catalog.getDatabaseId(),
539  table_,
540  current_user.userId);
541 
542  // maximum number of fragments buffered in memory at any one time, affects
543  // `maxFragRows` heuristic below
544  const size_t maximum_num_fragments_buffered = 3;
545  // set fragment size for proxy foreign table during import
546  foreign_table->maxFragRows =
547  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
548  copy_params.max_import_batch_row_count,
549  *session_info,
550  table_->tableId);
551 
552  // log for debugging purposes
553  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
554  << " for table " << table_->tableName;
555 
556  auto data_wrapper =
558  copy_params,
559  catalog.getDatabaseId(),
560  foreign_table.get(),
561  user_mapping.get());
562 
563  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
564  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
565  ChunkMetadataVector metadata_vector =
566  metadata_scan(data_wrapper.get(), foreign_table.get());
567  if (metadata_vector.empty()) { // an empty data source
568  return {};
569  }
570  max_fragment_id = 0;
571  for (const auto& [key, _] : metadata_vector) {
572  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
573  }
574  CHECK_GE(max_fragment_id, 0);
575  }
576 
577  import_status = import_foreign_data(max_fragment_id,
578  connector_.get(),
579  catalog,
580  table_,
581  data_wrapper.get(),
582  session_info,
583  copy_params,
584  copy_from_source,
585  maximum_num_fragments_buffered);
586 
587  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
588 
589  finalize(*session_info, import_status, table_->tableId);
590 
591  return import_status;
592 }
std::string tableName
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
#define CHECK_GE(x, y)
Definition: Logger.h:306
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
Catalog & getCatalog() const
Definition: SessionInfo.h:75
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
#define CHECK(condition)
Definition: Logger.h:291
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneralS3 ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 604 of file ForeignDataImporter.cpp.

References threading_serial::async(), CHECK, copy_from_source_, copy_params_, foreign_storage::create_futures_for_workers(), default_import_path_, import_export::CopyParams::file_sort_order_by, import_export::CopyParams::file_sort_regex, importGeneral(), gpu_enabled::iota(), shared::is_s3_uri(), import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, import_export::ImportStatus::load_failed, import_export::CopyParams::plain_text, import_export::CopyParams::regex_path_filter, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_max_concurrent_downloads, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, import_export::Importer::set_import_status(), import_export::CopyParams::source_type, to_string(), and shared::validate_sort_options().

Referenced by import().

605  {
607 
609 #if ENABLE_IMPORT_PARQUET
611 #endif
613  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
614  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
615 #if ENABLE_IMPORT_PARQUET
616  ", 'PARQUET_FILE'"
617 #endif
618  " or 'REGEX_PARSED_FILE'");
619  }
620 
625 
626 #ifdef HAVE_AWS_S3
627 
628  auto uuid = boost::uuids::random_generator()();
629  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
630  auto import_path = std::filesystem::path(default_import_path_) / base_path;
631 
632  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
642  import_path);
643  s3_archive->init_for_read();
644 
645  const auto bucket_name = s3_archive->url_part(4);
646 
647  auto object_keys = s3_archive->get_objkeys();
648  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649  size_t object_count = 0;
650  for (const auto& objkey : object_keys) {
651  auto& object = objects_to_process[object_count++];
652  object.object_key = objkey;
653  object.is_downloaded = false;
654  }
655 
656  // Ensure files & dirs are cleaned up, regardless of outcome
657  ScopeGuard cleanup_guard = [&] {
658  if (std::filesystem::exists(import_path)) {
659  std::filesystem::remove_all(import_path);
660  }
661  };
662 
663  ImportStatus aggregate_import_status;
664  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
665 
666  std::mutex communication_mutex;
667  bool continue_downloading = true;
668  bool download_exception_occured = false;
669 
670  std::condition_variable files_download_condition;
671 
672  auto is_downloading_finished = [&] {
673  std::unique_lock communication_lock(communication_mutex);
674  return !continue_downloading || download_exception_occured;
675  };
676 
677  std::function<void(const std::vector<size_t>&)> download_objects =
678  [&](const std::vector<size_t>& partition) {
679  for (const auto& index : partition) {
680  DownloadedObjectToProcess& object = objects_to_process[index];
681  const std::string& obj_key = object.object_key;
682  if (is_downloading_finished()) {
683  return;
684  }
685  std::exception_ptr eptr; // unused
686  std::string local_file_path;
687  std::string exception_what;
688  bool exception_occured = false;
689 
690  try {
691  local_file_path = s3_archive->land(obj_key,
692  eptr,
693  false,
694  /*allow_named_pipe_use=*/false,
695  /*track_file_path=*/false);
696  } catch (const std::exception& e) {
697  exception_what = e.what();
698  exception_occured = true;
699  }
700 
701  if (is_downloading_finished()) {
702  return;
703  }
704  if (exception_occured) {
705  {
706  std::unique_lock communication_lock(communication_mutex);
707  download_exception_occured = true;
708  }
709  files_download_condition.notify_all();
710  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
711  "': " + exception_what);
712  }
713 
714  object.download_file_path = local_file_path;
715  object.is_downloaded =
716  true; // this variable is atomic and therefore acts as a lock, it must be
717  // set last to ensure no data race
718 
719  files_download_condition.notify_all();
720  }
721  };
722 
723  std::function<void()> import_local_files = [&]() {
724  for (size_t object_index = 0; object_index < object_count;) {
725  {
726  std::unique_lock communication_lock(communication_mutex);
727  files_download_condition.wait(
728  communication_lock,
729  [&download_exception_occured, object_index, &objects_to_process]() {
730  return objects_to_process[object_index].is_downloaded ||
731  download_exception_occured;
732  });
733  if (download_exception_occured) { // do not wait for object index if a download
734  // error has occured
735  return;
736  }
737  }
738 
739  // find largest range of files to import
740  size_t end_object_index = object_count;
741  for (size_t i = object_index + 1; i < object_count; ++i) {
742  if (!objects_to_process[i].is_downloaded) {
743  end_object_index = i;
744  break;
745  }
746  }
747 
748  ImportStatus local_import_status;
749  std::string local_import_dir;
750  try {
751  CopyParams local_copy_params;
752  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
753  copy_params_, objects_to_process, object_index, end_object_index);
754  local_import_status =
755  importGeneral(session_info, local_import_dir, local_copy_params);
756  // clean up temporary files
757  std::filesystem::remove_all(local_import_dir);
758  } catch (const std::exception& except) {
759  // replace all occurences of file names with the object keys for
760  // users
761  std::string what = except.what();
762 
763  for (size_t i = object_index; i < end_object_index; ++i) {
764  auto& object = objects_to_process[i];
765  what = boost::regex_replace(what,
766  boost::regex{object.import_file_path},
767  bucket_name + "/" + object.object_key);
768  }
769  {
770  std::unique_lock communication_lock(communication_mutex);
771  continue_downloading = false;
772  }
773  // clean up temporary files
774  std::filesystem::remove_all(local_import_dir);
775  throw std::runtime_error(what);
776  }
777  aggregate_import_status += local_import_status;
779  aggregate_import_status);
780  if (aggregate_import_status.load_failed) {
781  {
782  std::unique_lock communication_lock(communication_mutex);
783  continue_downloading = false;
784  }
785  return;
786  }
787 
788  object_index =
789  end_object_index; // all objects in range [object_index,end_object_index)
790  // correctly imported at this point in excecution, move onto
791  // next range
792  }
793  };
794 
795  std::vector<size_t> partition_range(object_count);
796  std::iota(partition_range.begin(), partition_range.end(), 0);
797  auto download_futures = foreign_storage::create_futures_for_workers(
798  partition_range, num_download_threads, download_objects);
799 
800  auto import_future = std::async(std::launch::async, import_local_files);
801 
802  for (auto& future : download_futures) {
803  future.wait();
804  }
805  import_future.get(); // may throw an exception
806 
807  // get any remaining exceptions
808  for (auto& future : download_futures) {
809  future.get();
810  }
811  return aggregate_import_status;
812 
813 #else
814  throw std::runtime_error("AWS S3 support not available");
815 
816  return {};
817 #endif
818 }
std::string s3_secret_key
Definition: CopyParams.h:62
bool is_s3_uri(const std::string &file_path)
void validate_sort_options(const FilePathOptions &options)
std::string to_string(char const *&&v)
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
import_export::SourceType source_type
Definition: CopyParams.h:57
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:291
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::setDefaultImportPath ( const std::string &  base_path)
static

Definition at line 599 of file ForeignDataImporter.cpp.

References default_import_path_, and shared::kDefaultImportDirName.

Referenced by CommandLineOptions::validate().

599  {
600  auto data_dir_path = boost::filesystem::canonical(base_path);
601  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
602 }
const std::string kDefaultImportDirName

+ Here is the caller graph for this function:

Member Data Documentation

std::unique_ptr<Fragmenter_Namespace::InsertDataLoader::InsertConnector> import_export::ForeignDataImporter::connector_
protected

Definition at line 43 of file ForeignDataImporter.h.

Referenced by finalize(), ForeignDataImporter(), and importGeneral().

std::string import_export::ForeignDataImporter::copy_from_source_
private

Definition at line 66 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

CopyParams import_export::ForeignDataImporter::copy_params_
private

Definition at line 67 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

std::string import_export::ForeignDataImporter::default_import_path_
inlinestaticprivate

Definition at line 69 of file ForeignDataImporter.h.

Referenced by importGeneralS3(), and setDefaultImportPath().

int32_t import_export::ForeignDataImporter::proxy_foreign_table_fragment_size_ = 0
static
const TableDescriptor* import_export::ForeignDataImporter::table_
private

Definition at line 68 of file ForeignDataImporter.h.

Referenced by finalize(), and importGeneral().


The documentation for this class was generated from the following files: