OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::ForeignDataImporter Class Reference

#include <ForeignDataImporter.h>

+ Inheritance diagram for import_export::ForeignDataImporter:
+ Collaboration diagram for import_export::ForeignDataImporter:

Public Member Functions

 ForeignDataImporter (const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static void setDefaultImportPath (const std::string &base_path)
 

Static Public Attributes

static int32_t proxy_foreign_table_fragment_size_ = 0
 

Protected Attributes

std::unique_ptr
< Fragmenter_Namespace::InsertDataLoader::InsertConnector
connector_
 

Private Member Functions

void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
 
void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const int32_t table_id)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGeneral (const Catalog_Namespace::SessionInfo *session_info, const std::string &copy_from_source, const CopyParams &copy_params)
 
ImportStatus importGeneralS3 (const Catalog_Namespace::SessionInfo *session_info)
 

Private Attributes

std::string copy_from_source_
 
CopyParams copy_params_
 
const TableDescriptortable_
 

Static Private Attributes

static std::string default_import_path_
 

Detailed Description

Definition at line 26 of file ForeignDataImporter.h.

Constructor & Destructor Documentation

import_export::ForeignDataImporter::ForeignDataImporter ( const std::string &  file_path,
const CopyParams copy_params,
const TableDescriptor table 
)

Definition at line 407 of file ForeignDataImporter.cpp.

References connector_.

410  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
411  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
412 }
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_

Member Function Documentation

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &  string_dictionaries 
)
private

Definition at line 414 of file ForeignDataImporter.cpp.

References connector_, DEBUG_TIMER, Data_Namespace::DISK_LEVEL, logger::ERROR, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, LOG, TableDescriptor::persistenceLevel, table_, and TableDescriptor::tableId.

Referenced by finalize(), and importGeneral().

418  {
419  if (table_->persistenceLevel ==
420  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
421  // tables
422  if (!import_status.load_failed) {
423  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
424  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
425  if (!string_dictionary->checkpoint()) {
426  LOG(ERROR) << "Checkpointing Dictionary for Column "
427  << column_desciptor->columnName << " failed.";
428  import_status.load_failed = true;
429  import_status.load_msg = "Dictionary checkpoint failed";
430  break;
431  }
432  }
433  }
434  }
435  if (import_status.load_failed) {
436  connector_->rollback(parent_session_info, table_->tableId);
437  } else {
438  connector_->checkpoint(parent_session_info, table_->tableId);
439  }
440 }
#define LOG(tag)
Definition: Logger.h:285
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
Data_Namespace::MemoryLevel persistenceLevel
#define DEBUG_TIMER(name)
Definition: Logger.h:412

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const int32_t  table_id 
)
private

Definition at line 442 of file ForeignDataImporter.cpp.

References finalize(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), and Catalog_Namespace::SessionInfo::getCatalog().

445  {
446  auto& catalog = parent_session_info.getCatalog();
447 
448  auto logical_columns =
449  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
450 
451  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
452  for (const auto& column_descriptor : logical_columns) {
453  if (!column_descriptor->columnType.is_dict_encoded_string()) {
454  continue;
455  }
456  auto dict_descriptor =
457  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
458  string_dictionaries.emplace_back(column_descriptor,
459  dict_descriptor->stringDict.get());
460  }
461 
462  finalize(parent_session_info, import_status, string_dictionaries);
463 }
Catalog & getCatalog() const
Definition: SessionInfo.h:75
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2172

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 931 of file ForeignDataImporter.cpp.

References importGeneral(), importGeneralS3(), and shared::is_s3_uri().

932  {
934  return importGeneralS3(session_info);
935  }
936  return importGeneral(session_info);
937 }
bool is_s3_uri(const std::string &file_path)
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 587 of file ForeignDataImporter.cpp.

References copy_from_source_, and copy_params_.

Referenced by import(), and importGeneralS3().

588  {
589  return importGeneral(session_info, copy_from_source_, copy_params_);
590 }
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)

+ Here is the caller graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneral ( const Catalog_Namespace::SessionInfo session_info,
const std::string &  copy_from_source,
const CopyParams copy_params 
)
private

Definition at line 514 of file ForeignDataImporter.cpp.

References CHECK, CHECK_GE, CHUNK_KEY_FRAGMENT_IDX, connector_, foreign_storage::create_proxy_fsi_objects(), foreign_storage::ForeignDataWrapperFactory::createForGeneralImport(), finalize(), Catalog_Namespace::SessionInfo::get_currentUser(), import_export::anonymous_namespace{ForeignDataImporter.cpp}::get_proxy_foreign_table_fragment_size(), Catalog_Namespace::SessionInfo::getCatalog(), anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data(), logger::INFO, foreign_storage::is_valid_source_type(), LOG, import_export::CopyParams::max_import_batch_row_count, anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan(), table_, TableDescriptor::tableId, TableDescriptor::tableName, TableDescriptor::userId, and anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params().

517  {
518  auto& catalog = session_info->getCatalog();
519 
521 
522  // validate copy params before import in order to print user friendly messages
523  validate_copy_params(copy_params);
524 
525  ImportStatus import_status;
526  {
527  auto& current_user = session_info->get_currentUser();
528  auto [server, user_mapping, foreign_table] =
530  copy_params,
531  catalog.getDatabaseId(),
532  table_,
533  current_user.userId);
534 
535  // maximum number of fragments buffered in memory at any one time, affects
536  // `maxFragRows` heuristic below
537  const size_t maximum_num_fragments_buffered = 3;
538  // set fragment size for proxy foreign table during import
539  foreign_table->maxFragRows =
540  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
541  copy_params.max_import_batch_row_count,
542  *session_info,
543  table_->tableId);
544 
545  // log for debugging purposes
546  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
547  << " for table " << table_->tableName;
548 
549  auto data_wrapper =
551  copy_params,
552  catalog.getDatabaseId(),
553  foreign_table.get(),
554  user_mapping.get());
555 
556  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
557  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
558  ChunkMetadataVector metadata_vector =
559  metadata_scan(data_wrapper.get(), foreign_table.get());
560  if (metadata_vector.empty()) { // an empty data source
561  return {};
562  }
563  max_fragment_id = 0;
564  for (const auto& [key, _] : metadata_vector) {
565  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
566  }
567  CHECK_GE(max_fragment_id, 0);
568  }
569 
570  import_status = import_foreign_data(max_fragment_id,
571  connector_.get(),
572  catalog,
573  table_,
574  data_wrapper.get(),
575  session_info,
576  copy_params,
577  copy_from_source,
578  maximum_num_fragments_buffered);
579 
580  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
581 
582  finalize(*session_info, import_status, table_->tableId);
583 
584  return import_status;
585 }
std::string tableName
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
#define CHECK_GE(x, y)
Definition: Logger.h:306
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
Catalog & getCatalog() const
Definition: SessionInfo.h:75
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
#define CHECK(condition)
Definition: Logger.h:291
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88

+ Here is the call graph for this function:

ImportStatus import_export::ForeignDataImporter::importGeneralS3 ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 597 of file ForeignDataImporter.cpp.

References threading_serial::async(), CHECK, copy_from_source_, copy_params_, foreign_storage::create_futures_for_workers(), default_import_path_, import_export::CopyParams::file_sort_order_by, import_export::CopyParams::file_sort_regex, importGeneral(), gpu_enabled::iota(), shared::is_s3_uri(), import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, import_export::ImportStatus::load_failed, import_export::CopyParams::plain_text, import_export::CopyParams::regex_path_filter, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_max_concurrent_downloads, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, import_export::Importer::set_import_status(), import_export::CopyParams::source_type, to_string(), and shared::validate_sort_options().

Referenced by import().

598  {
600 
602 #if ENABLE_IMPORT_PARQUET
604 #endif
606  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
607  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
608 #if ENABLE_IMPORT_PARQUET
609  ", 'PARQUET_FILE'"
610 #endif
611  " or 'REGEX_PARSED_FILE'");
612  }
613 
618 
619 #ifdef HAVE_AWS_S3
620 
621  auto uuid = boost::uuids::random_generator()();
622  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
623  auto import_path = std::filesystem::path(default_import_path_) / base_path;
624 
625  // Ensure files & dirs are cleaned up, regardless of outcome
626  ScopeGuard cleanup_guard = [&] {
627  if (std::filesystem::exists(import_path)) {
628  std::filesystem::remove_all(import_path);
629  }
630  };
631 
632  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
642  import_path);
643  s3_archive->init_for_read();
644 
645  const auto bucket_name = s3_archive->url_part(4);
646 
647  auto object_keys = s3_archive->get_objkeys();
648  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649  size_t object_count = 0;
650  for (const auto& objkey : object_keys) {
651  auto& object = objects_to_process[object_count++];
652  object.object_key = objkey;
653  object.is_downloaded = false;
654  }
655 
656  ImportStatus aggregate_import_status;
657  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
658 
659  std::mutex communication_mutex;
660  bool continue_downloading = true;
661  bool download_exception_occured = false;
662 
663  std::condition_variable files_download_condition;
664 
665  auto is_downloading_finished = [&] {
666  std::unique_lock communication_lock(communication_mutex);
667  return !continue_downloading || download_exception_occured;
668  };
669 
670  std::function<void(const std::vector<size_t>&)> download_objects =
671  [&](const std::vector<size_t>& partition) {
672  for (const auto& index : partition) {
673  DownloadedObjectToProcess& object = objects_to_process[index];
674  const std::string& obj_key = object.object_key;
675  if (is_downloading_finished()) {
676  return;
677  }
678  std::exception_ptr eptr; // unused
679  std::string local_file_path;
680  std::string exception_what;
681  bool exception_occured = false;
682 
683  try {
684  local_file_path = s3_archive->land(obj_key,
685  eptr,
686  false,
687  /*allow_named_pipe_use=*/false,
688  /*track_file_path=*/false);
689  } catch (const std::exception& e) {
690  exception_what = e.what();
691  exception_occured = true;
692  }
693 
694  if (is_downloading_finished()) {
695  return;
696  }
697  if (exception_occured) {
698  {
699  std::unique_lock communication_lock(communication_mutex);
700  download_exception_occured = true;
701  }
702  files_download_condition.notify_all();
703  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
704  "': " + exception_what);
705  }
706 
707  object.download_file_path = local_file_path;
708  object.is_downloaded =
709  true; // this variable is atomic and therefore acts as a lock, it must be
710  // set last to ensure no data race
711 
712  files_download_condition.notify_all();
713  }
714  };
715 
716  std::function<void()> import_local_files = [&]() {
717  for (size_t object_index = 0; object_index < object_count;) {
718  {
719  std::unique_lock communication_lock(communication_mutex);
720  files_download_condition.wait(
721  communication_lock,
722  [&download_exception_occured, object_index, &objects_to_process]() {
723  return objects_to_process[object_index].is_downloaded ||
724  download_exception_occured;
725  });
726  if (download_exception_occured) { // do not wait for object index if a download
727  // error has occured
728  return;
729  }
730  }
731 
732  // find largest range of files to import
733  size_t end_object_index = object_count;
734  for (size_t i = object_index + 1; i < object_count; ++i) {
735  if (!objects_to_process[i].is_downloaded) {
736  end_object_index = i;
737  break;
738  }
739  }
740 
741  ImportStatus local_import_status;
742  std::string local_import_dir;
743  try {
744  CopyParams local_copy_params;
745  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
746  copy_params_, objects_to_process, object_index, end_object_index);
747  local_import_status =
748  importGeneral(session_info, local_import_dir, local_copy_params);
749  // clean up temporary files
750  std::filesystem::remove_all(local_import_dir);
751  } catch (const std::exception& except) {
752  // replace all occurences of file names with the object keys for
753  // users
754  std::string what = except.what();
755 
756  for (size_t i = object_index; i < end_object_index; ++i) {
757  auto& object = objects_to_process[i];
758  what = boost::regex_replace(what,
759  boost::regex{object.import_file_path},
760  bucket_name + "/" + object.object_key);
761  }
762  {
763  std::unique_lock communication_lock(communication_mutex);
764  continue_downloading = false;
765  }
766  // clean up temporary files
767  std::filesystem::remove_all(local_import_dir);
768  throw std::runtime_error(what);
769  }
770  aggregate_import_status += local_import_status;
772  aggregate_import_status);
773  if (aggregate_import_status.load_failed) {
774  {
775  std::unique_lock communication_lock(communication_mutex);
776  continue_downloading = false;
777  }
778  return;
779  }
780 
781  object_index =
782  end_object_index; // all objects in range [object_index,end_object_index)
783  // correctly imported at this point in excecution, move onto
784  // next range
785  }
786  };
787 
788  std::vector<size_t> partition_range(object_count);
789  std::iota(partition_range.begin(), partition_range.end(), 0);
790  auto download_futures = foreign_storage::create_futures_for_workers(
791  partition_range, num_download_threads, download_objects);
792 
793  auto import_future = std::async(std::launch::async, import_local_files);
794 
795  for (auto& future : download_futures) {
796  future.wait();
797  }
798  import_future.get(); // may throw an exception
799 
800  // get any remaining exceptions
801  for (auto& future : download_futures) {
802  future.get();
803  }
804  return aggregate_import_status;
805 
806 #else
807  throw std::runtime_error("AWS S3 support not available");
808 
809  return {};
810 #endif
811 }
std::string s3_secret_key
Definition: CopyParams.h:62
bool is_s3_uri(const std::string &file_path)
void validate_sort_options(const FilePathOptions &options)
std::string to_string(char const *&&v)
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
import_export::SourceType source_type
Definition: CopyParams.h:57
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:291
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::ForeignDataImporter::setDefaultImportPath ( const std::string &  base_path)
static

Definition at line 592 of file ForeignDataImporter.cpp.

References default_import_path_, and shared::kDefaultImportDirName.

Referenced by CommandLineOptions::validate().

592  {
593  auto data_dir_path = boost::filesystem::canonical(base_path);
594  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
595 }
const std::string kDefaultImportDirName

+ Here is the caller graph for this function:

Member Data Documentation

std::unique_ptr<Fragmenter_Namespace::InsertDataLoader::InsertConnector> import_export::ForeignDataImporter::connector_
protected

Definition at line 43 of file ForeignDataImporter.h.

Referenced by finalize(), ForeignDataImporter(), and importGeneral().

std::string import_export::ForeignDataImporter::copy_from_source_
private

Definition at line 66 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

CopyParams import_export::ForeignDataImporter::copy_params_
private

Definition at line 67 of file ForeignDataImporter.h.

Referenced by importGeneral(), and importGeneralS3().

std::string import_export::ForeignDataImporter::default_import_path_
inlinestaticprivate

Definition at line 69 of file ForeignDataImporter.h.

Referenced by importGeneralS3(), and setDefaultImportPath().

int32_t import_export::ForeignDataImporter::proxy_foreign_table_fragment_size_ = 0
static
const TableDescriptor* import_export::ForeignDataImporter::table_
private

Definition at line 68 of file ForeignDataImporter.h.

Referenced by finalize(), and importGeneral().


The documentation for this class was generated from the following files: