OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataWrapperFactory.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "FsiJsonUtils.h"
19 
20 #include "CsvDataWrapper.h"
21 #include "ForeignDataWrapper.h"
23 #ifdef ENABLE_IMPORT_PARQUET
24 #include "ParquetDataWrapper.h"
25 #include "ParquetImporter.h"
26 #endif
27 #include "RegexParserDataWrapper.h"
28 #include "Catalog/os/UserMapping.h"
29 
30 namespace {
31 
32 bool is_s3_uri(const std::string& file_path) {
33  const std::string s3_prefix = "s3://";
34  return file_path.find(s3_prefix) != std::string::npos;
35 }
36 } // namespace
37 
38 namespace foreign_storage {
39 
40 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForImport(
41  const std::string& data_wrapper_type,
42  const int db_id,
43  const ForeignTable* foreign_table,
44  const UserMapping* user_mapping) {
45 #ifdef ENABLE_IMPORT_PARQUET
46  // only supported for parquet import path currently
47  CHECK(data_wrapper_type == DataWrapperType::PARQUET);
48  return std::make_unique<ParquetImporter>(db_id, foreign_table, user_mapping);
49 #else
50  return {};
51 #endif
52 }
53 
54 std::unique_ptr<UserMapping>
56  const int db_id,
57  const int user_id,
58  const std::string& file_path,
59  const import_export::CopyParams& copy_params,
60  const ForeignServer* server) {
61  return {};
62 }
63 
65  const int db_id,
66  const int user_id,
67  const std::string& file_path,
68  const import_export::CopyParams& copy_params) {
69 // only supported for parquet import path currently
70 #ifdef ENABLE_IMPORT_PARQUET
71  CHECK(copy_params.file_type == import_export::FileType::PARQUET);
72 #else
73  UNREACHABLE() << "Unexpected method call for non-Parquet import";
74 #endif
75 
76  auto foreign_server = std::make_unique<foreign_storage::ForeignServer>();
77 
78  foreign_server->id = -1;
79  foreign_server->user_id = user_id;
80  foreign_server->data_wrapper_type = DataWrapperType::PARQUET;
81  foreign_server->name = "import_proxy_server";
82 
83  bool is_aws_s3_storage_type = is_s3_uri(file_path);
84  if (is_aws_s3_storage_type) {
85  throw std::runtime_error("AWS storage not supported");
86  } else {
87  foreign_server->options[AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY] =
89  }
90 
91  return foreign_server;
92 }
93 
95  const int db_id,
96  const TableDescriptor* table,
97  const std::string& file_path,
98  const import_export::CopyParams& copy_params,
99  const ForeignServer* server) {
100 // only supported for parquet import path currently
101 #ifdef ENABLE_IMPORT_PARQUET
102  CHECK(copy_params.file_type == import_export::FileType::PARQUET);
103 #else
104  UNREACHABLE() << "Unexpected method call for non-Parquet import";
105 #endif
106 
107  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
108  auto foreign_table = std::make_unique<ForeignTable>();
109 
110  *static_cast<TableDescriptor*>(foreign_table.get()) =
111  *table; // copy table related values
112 
113  CHECK(server);
114  foreign_table->foreign_server = server;
115 
116  bool is_aws_s3_storage_type = is_s3_uri(file_path);
117  if (is_aws_s3_storage_type) {
118  throw std::runtime_error("AWS storage not supported");
119  } else {
120  foreign_table->options["FILE_PATH"] = file_path;
121  }
122 
123  foreign_table->initializeOptions();
124  return foreign_table;
125 }
126 
127 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::create(
128  const std::string& data_wrapper_type,
129  const int db_id,
130  const ForeignTable* foreign_table) {
131  std::unique_ptr<ForeignDataWrapper> data_wrapper;
132  if (data_wrapper_type == DataWrapperType::CSV) {
133  if (CsvDataWrapper::validateAndGetIsS3Select(foreign_table)) {
134  UNREACHABLE();
135  } else {
136  data_wrapper = std::make_unique<CsvDataWrapper>(db_id, foreign_table);
137  }
138 #ifdef ENABLE_IMPORT_PARQUET
139  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
140  data_wrapper = std::make_unique<ParquetDataWrapper>(db_id, foreign_table);
141 #endif
142  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
143  data_wrapper = std::make_unique<RegexParserDataWrapper>(db_id, foreign_table);
144  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
145  data_wrapper = std::make_unique<InternalCatalogDataWrapper>(db_id, foreign_table);
146  } else {
147  throw std::runtime_error("Unsupported data wrapper");
148  }
149  return data_wrapper;
150 }
151 
153  const std::string& data_wrapper_type,
154  const ForeignTable* foreign_table) {
155  bool is_s3_select_wrapper{false};
156  std::string data_wrapper_type_key{data_wrapper_type};
157  constexpr const char* S3_SELECT_WRAPPER_KEY = "CSV_S3_SELECT";
158  if (foreign_table && data_wrapper_type == DataWrapperType::CSV &&
160  is_s3_select_wrapper = true;
161  data_wrapper_type_key = S3_SELECT_WRAPPER_KEY;
162  }
163 
164  if (validation_data_wrappers_.find(data_wrapper_type_key) ==
166  if (data_wrapper_type == DataWrapperType::CSV) {
167  if (is_s3_select_wrapper) {
168  UNREACHABLE();
169  } else {
170  validation_data_wrappers_[data_wrapper_type_key] =
171  std::make_unique<CsvDataWrapper>();
172  }
173 #ifdef ENABLE_IMPORT_PARQUET
174  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
175  validation_data_wrappers_[data_wrapper_type_key] =
176  std::make_unique<ParquetDataWrapper>();
177 #endif
178  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
179  validation_data_wrappers_[data_wrapper_type_key] =
180  std::make_unique<RegexParserDataWrapper>();
181  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
182  validation_data_wrappers_[data_wrapper_type_key] =
183  std::make_unique<InternalCatalogDataWrapper>();
184  } else {
185  UNREACHABLE();
186  }
187  }
188  CHECK(validation_data_wrappers_.find(data_wrapper_type_key) !=
190  return *validation_data_wrappers_[data_wrapper_type_key];
191 }
192 
194  const std::string& data_wrapper_type) {
195  const auto& supported_wrapper_types = DataWrapperType::supported_data_wrapper_types;
196  if (std::find(supported_wrapper_types.begin(),
197  supported_wrapper_types.end(),
198  data_wrapper_type) == supported_wrapper_types.end()) {
199  std::vector<std::string_view> user_facing_wrapper_types;
200  for (const auto& type : supported_wrapper_types) {
202  user_facing_wrapper_types.emplace_back(type);
203  }
204  }
205  throw std::runtime_error{"Invalid data wrapper type \"" + data_wrapper_type +
206  "\". Data wrapper type must be one of the following: " +
207  join(user_facing_wrapper_types, ", ") + "."};
208  }
209 }
210 
211 std::map<std::string, std::unique_ptr<ForeignDataWrapper>>
213 } // namespace foreign_storage
static const ForeignDataWrapper & createForValidation(const std::string &data_wrapper_type, const ForeignTable *foreign_table=nullptr)
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static constexpr char const * REGEX_PARSER
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
static constexpr std::array< std::string_view, 4 > supported_data_wrapper_types
std::string join(T const &container, std::string const &delim)
#define UNREACHABLE()
Definition: Logger.h:253
static bool validateAndGetIsS3Select(const ForeignTable *foreign_table)
static constexpr char const * INTERNAL_CATALOG
static std::map< std::string, std::unique_ptr< ForeignDataWrapper > > validation_data_wrappers_
static SysCatalog & instance()
Definition: SysCatalog.h:325
static void validateDataWrapperType(const std::string &data_wrapper_type)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:209
static constexpr char const * CSV
static constexpr char const * PARQUET
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)