OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
20 #include "Importer.h"
21 #include "Parser/ParserNode.h"
22 #include "Shared/measure.h"
23 #include "UserMapping.h"
24 
25 namespace import_export {
26 
27 ForeignDataImporter::ForeignDataImporter(const std::string& file_path,
28  const CopyParams& copy_params,
29  const TableDescriptor* table)
30  : file_path_(file_path), copy_params_(copy_params), table_(table) {
31  connector_ = std::make_unique<Parser::LocalConnector>();
32 }
33 
35  const Catalog_Namespace::SessionInfo& parent_session_info,
36  ImportStatus& import_status,
37  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*> >&
38  string_dictionaries) {
39  if (table_->persistenceLevel ==
40  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
41  // tables
42  if (!import_status.load_failed) {
43  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
44  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
45  if (!string_dictionary->checkpoint()) {
46  LOG(ERROR) << "Checkpointing Dictionary for Column "
47  << column_desciptor->columnName << " failed.";
48  import_status.load_failed = true;
49  import_status.load_msg = "Dictionary checkpoint failed";
50  break;
51  }
52  }
53  }
54  }
55  if (import_status.load_failed) {
56  connector_->rollback(parent_session_info, table_->tableId);
57  } else {
58  connector_->checkpoint(parent_session_info, table_->tableId);
59  }
60 }
61 
63  const Catalog_Namespace::SessionInfo* session_info) {
64  auto& catalog = session_info->getCatalog();
65 
66 #ifdef ENABLE_IMPORT_PARQUET
67  CHECK(copy_params_.file_type == import_export::FileType::PARQUET);
68 #else
69  UNREACHABLE() << "Unexpected method call for non-Parquet import";
70 #endif
71 
72  auto& current_user = session_info->get_currentUser();
74  catalog.getDatabaseId(), current_user.userId, file_path_, copy_params_);
75 
76  auto user_mapping =
78  catalog.getDatabaseId(),
79  current_user.userId,
80  file_path_,
82  server.get());
83 
84  auto foreign_table =
86  catalog.getDatabaseId(), table_, file_path_, copy_params_, server.get());
87 
88  foreign_table->validateOptionValues();
89 
92  catalog.getDatabaseId(),
93  foreign_table.get(),
94  user_mapping.get());
95 
96  if (auto parquet_import =
97  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
99  ImportStatus import_status; // manually update
100  while (true) {
101  auto batch_result = parquet_import->getNextImportBatch();
102  auto batch = batch_result->getInsertData();
103  if (!batch) {
104  break;
105  }
106  insert_data_loader.insertData(*session_info, *batch);
107 
108  auto batch_import_status = batch_result->getImportStatus();
109  import_status.rows_completed += batch_import_status.rows_completed;
110  import_status.rows_rejected += batch_import_status.rows_rejected;
111  if (import_status.rows_rejected > copy_params_.max_reject) {
112  import_status.load_failed = true;
113  import_status.load_msg =
114  "Load was cancelled due to max reject rows being reached";
115  break;
116  }
117  }
118 
119  if (import_status.load_failed) {
120  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
121  // the rollback in the checkpoint below
122  }
123 
124  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
125 
126  return import_status;
127  }
128 
129  UNREACHABLE();
130  return {};
131 }
132 
133 } // namespace import_export
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
#define LOG(tag)
Definition: Logger.h:203
#define UNREACHABLE()
Definition: Logger.h:253
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
Classes representing a parse tree.
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::DistributedConnector > connector_
Catalog & getCatalog() const
Definition: SessionInfo.h:67
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
Data_Namespace::MemoryLevel persistenceLevel
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:73
static constexpr char const * PARQUET
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)