OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::ForeignDataImporter Class Reference

#include <ForeignDataImporter.h>

+ Inheritance diagram for import_export::ForeignDataImporter:
+ Collaboration diagram for import_export::ForeignDataImporter:

Public Member Functions

 ForeignDataImporter (const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Protected Attributes

std::unique_ptr
< Fragmenter_Namespace::InsertDataLoader::DistributedConnector
connector_
 

Private Member Functions

void finalize (const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
 

Private Attributes

std::string file_path_
 
CopyParams copy_params_
 
const TableDescriptortable_
 

Detailed Description

Definition at line 26 of file ForeignDataImporter.h.

Constructor & Destructor Documentation

import_export::ForeignDataImporter::ForeignDataImporter ( const std::string &  file_path,
const CopyParams copy_params,
const TableDescriptor table 
)

Definition at line 27 of file ForeignDataImporter.cpp.

References connector_.

30  : file_path_(file_path), copy_params_(copy_params), table_(table) {
31  connector_ = std::make_unique<Parser::LocalConnector>();
32 }
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::DistributedConnector > connector_

Member Function Documentation

void import_export::ForeignDataImporter::finalize ( const Catalog_Namespace::SessionInfo parent_session_info,
ImportStatus import_status,
const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &  string_dictionaries 
)
private

Definition at line 34 of file ForeignDataImporter.cpp.

References connector_, DEBUG_TIMER, Data_Namespace::DISK_LEVEL, logger::ERROR, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, LOG, TableDescriptor::persistenceLevel, table_, and TableDescriptor::tableId.

Referenced by import().

38  {
39  if (table_->persistenceLevel ==
40  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
41  // tables
42  if (!import_status.load_failed) {
43  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
44  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
45  if (!string_dictionary->checkpoint()) {
46  LOG(ERROR) << "Checkpointing Dictionary for Column "
47  << column_desciptor->columnName << " failed.";
48  import_status.load_failed = true;
49  import_status.load_msg = "Dictionary checkpoint failed";
50  break;
51  }
52  }
53  }
54  }
55  if (import_status.load_failed) {
56  connector_->rollback(parent_session_info, table_->tableId);
57  } else {
58  connector_->checkpoint(parent_session_info, table_->tableId);
59  }
60 }
#define LOG(tag)
Definition: Logger.h:203
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::DistributedConnector > connector_
Data_Namespace::MemoryLevel persistenceLevel
#define DEBUG_TIMER(name)
Definition: Logger.h:352

+ Here is the caller graph for this function:

ImportStatus import_export::ForeignDataImporter::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 62 of file ForeignDataImporter.cpp.

References CHECK, connector_, copy_params_, foreign_storage::ForeignDataWrapperFactory::createForeignServerProxy(), foreign_storage::ForeignDataWrapperFactory::createForeignTableProxy(), foreign_storage::ForeignDataWrapperFactory::createForImport(), foreign_storage::ForeignDataWrapperFactory::createUserMappingProxyIfApplicable(), file_path_, import_export::CopyParams::file_type, finalize(), Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::insertData(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, foreign_storage::DataWrapperType::PARQUET, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, table_, and UNREACHABLE.

63  {
64  auto& catalog = session_info->getCatalog();
65 
66 #ifdef ENABLE_IMPORT_PARQUET
67  CHECK(copy_params_.file_type == import_export::FileType::PARQUET);
68 #else
69  UNREACHABLE() << "Unexpected method call for non-Parquet import";
70 #endif
71 
72  auto& current_user = session_info->get_currentUser();
74  catalog.getDatabaseId(), current_user.userId, file_path_, copy_params_);
75 
76  auto user_mapping =
78  catalog.getDatabaseId(),
79  current_user.userId,
80  file_path_,
82  server.get());
83 
84  auto foreign_table =
86  catalog.getDatabaseId(), table_, file_path_, copy_params_, server.get());
87 
88  foreign_table->validateOptionValues();
89 
92  catalog.getDatabaseId(),
93  foreign_table.get(),
94  user_mapping.get());
95 
96  if (auto parquet_import =
97  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
99  ImportStatus import_status; // manually update
100  while (true) {
101  auto batch_result = parquet_import->getNextImportBatch();
102  auto batch = batch_result->getInsertData();
103  if (!batch) {
104  break;
105  }
106  insert_data_loader.insertData(*session_info, *batch);
107 
108  auto batch_import_status = batch_result->getImportStatus();
109  import_status.rows_completed += batch_import_status.rows_completed;
110  import_status.rows_rejected += batch_import_status.rows_rejected;
111  if (import_status.rows_rejected > copy_params_.max_reject) {
112  import_status.load_failed = true;
113  import_status.load_msg =
114  "Load was cancelled due to max reject rows being reached";
115  break;
116  }
117  }
118 
119  if (import_status.load_failed) {
120  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
121  // the rollback in the checkpoint below
122  }
123 
124  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
125 
126  return import_status;
127  }
128 
129  UNREACHABLE();
130  return {};
131 }
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
#define UNREACHABLE()
Definition: Logger.h:253
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::DistributedConnector > connector_
Catalog & getCatalog() const
Definition: SessionInfo.h:67
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:209
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:73
static constexpr char const * PARQUET
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)

+ Here is the call graph for this function:

Member Data Documentation

std::unique_ptr<Fragmenter_Namespace::InsertDataLoader::DistributedConnector> import_export::ForeignDataImporter::connector_
protected

Definition at line 39 of file ForeignDataImporter.h.

Referenced by finalize(), ForeignDataImporter(), and import().

CopyParams import_export::ForeignDataImporter::copy_params_
private

Definition at line 48 of file ForeignDataImporter.h.

Referenced by import().

std::string import_export::ForeignDataImporter::file_path_
private

Definition at line 47 of file ForeignDataImporter.h.

Referenced by import().

const TableDescriptor* import_export::ForeignDataImporter::table_
private

Definition at line 49 of file ForeignDataImporter.h.

Referenced by finalize(), and import().


The documentation for this class was generated from the following files: