OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataWrapperFactory.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "FsiJsonUtils.h"
19 
20 #include "CsvDataWrapper.h"
21 #include "ForeignDataWrapper.h"
25 #ifdef ENABLE_IMPORT_PARQUET
26 #include "ParquetDataWrapper.h"
27 #include "ParquetImporter.h"
28 #endif
29 #include "Catalog/os/UserMapping.h"
30 #include "RegexParserDataWrapper.h"
31 #include "Shared/SysDefinitions.h"
32 #include "Shared/misc.h"
33 
34 namespace foreign_storage {
35 bool is_s3_uri(const std::string& file_path) {
36  const std::string s3_prefix = "s3://";
37  return file_path.find(s3_prefix) != std::string::npos;
38 }
39 
40 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
41  std::unique_ptr<foreign_storage::UserMapping>,
42  std::unique_ptr<foreign_storage::ForeignTable>>
43 create_proxy_fsi_objects(const std::string& copy_from_source,
44  const import_export::CopyParams& copy_params,
45  const int db_id,
46  const TableDescriptor* table,
47  const int32_t user_id) {
49  db_id, user_id, copy_from_source, copy_params);
50 
51  CHECK(server);
52  server->validate();
53 
54  auto user_mapping =
56  db_id, user_id, copy_from_source, copy_params, server.get());
57 
58  if (user_mapping) {
59  user_mapping->validate(server.get());
60  }
61 
62  auto foreign_table =
64  db_id, table, copy_from_source, copy_params, server.get());
65 
66  CHECK(foreign_table);
67  foreign_table->validateOptionValues();
68 
69  return {std::move(server), std::move(user_mapping), std::move(foreign_table)};
70 }
71 
72 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
73  std::unique_ptr<foreign_storage::UserMapping>,
74  std::unique_ptr<foreign_storage::ForeignTable>>
75 create_proxy_fsi_objects(const std::string& copy_from_source,
76  const import_export::CopyParams& copy_params,
77  const TableDescriptor* table) {
78  return create_proxy_fsi_objects(copy_from_source, copy_params, -1, table, -1);
79 }
80 
81 } // namespace foreign_storage
82 
83 namespace {
84 
85 
86 bool is_valid_data_wrapper(const std::string& data_wrapper_type) {
87  return
88 #ifdef ENABLE_IMPORT_PARQUET
89  data_wrapper_type == foreign_storage::DataWrapperType::PARQUET ||
90 #endif
91  data_wrapper_type == foreign_storage::DataWrapperType::CSV ||
93 }
94 
95 } // namespace
96 
97 namespace foreign_storage {
98 
100  if (copy_params.line_regex.empty()) {
101  throw std::runtime_error{"Regex parser options must contain a line regex."};
102  }
103 }
104 
106  return
107 #ifdef ENABLE_IMPORT_PARQUET
109 #endif
112 }
113 
114 std::string bool_to_option_value(const bool value) {
115  return value ? "TRUE" : "FALSE";
116 }
117 
118 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForGeneralImport(
119  const std::string& data_wrapper_type,
120  const int db_id,
121  const ForeignTable* foreign_table,
122  const UserMapping* user_mapping) {
123  CHECK(is_valid_data_wrapper(data_wrapper_type));
124 
125  if (data_wrapper_type == DataWrapperType::CSV) {
126  return std::make_unique<CsvDataWrapper>(
127  db_id, foreign_table, user_mapping, /*disable_cache=*/true);
128  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
129  return std::make_unique<RegexParserDataWrapper>(
130  db_id, foreign_table, user_mapping, true);
131  }
132 #ifdef ENABLE_IMPORT_PARQUET
133  else if (data_wrapper_type == DataWrapperType::PARQUET) {
134  return std::make_unique<ParquetDataWrapper>(
135  db_id, foreign_table, /*do_metadata_stats_validation=*/false);
136  }
137 #endif
138 
139  return {};
140 }
141 
142 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForImport(
143  const std::string& data_wrapper_type,
144  const int db_id,
145  const ForeignTable* foreign_table,
146  const UserMapping* user_mapping) {
147 #ifdef ENABLE_IMPORT_PARQUET
148  // only supported for parquet import path currently
149  CHECK(data_wrapper_type == DataWrapperType::PARQUET);
150  return std::make_unique<ParquetImporter>(db_id, foreign_table, user_mapping);
151 #else
152  return {};
153 #endif
154 }
155 
156 std::unique_ptr<UserMapping>
158  const int db_id,
159  const int user_id,
160  const std::string& file_path,
161  const import_export::CopyParams& copy_params,
162  const ForeignServer* server) {
163  return {};
164 }
165 
167  const int db_id,
168  const int user_id,
169  const std::string& file_path,
170  const import_export::CopyParams& copy_params) {
171  CHECK(is_valid_source_type(copy_params));
172 
173  auto foreign_server = std::make_unique<foreign_storage::ForeignServer>();
174 
175  foreign_server->id = -1;
176  foreign_server->user_id = user_id;
178  foreign_server->data_wrapper_type = DataWrapperType::CSV;
179  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
180  foreign_server->data_wrapper_type = DataWrapperType::REGEX_PARSER;
181 #ifdef ENABLE_IMPORT_PARQUET
182  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
183  foreign_server->data_wrapper_type = DataWrapperType::PARQUET;
184 #endif
185  } else {
186  UNREACHABLE();
187  }
188  foreign_server->name = "import_proxy_server";
189 
190  if (copy_params.source_type == import_export::SourceType::kOdbc) {
191  throw std::runtime_error("ODBC storage not supported");
192  } else if (is_s3_uri(file_path)) {
193  throw std::runtime_error("AWS storage not supported");
194  } else {
195  foreign_server->options[AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY] =
197  }
198 
199  return foreign_server;
200 }
201 
203  const int db_id,
204  const TableDescriptor* table,
205  const std::string& copy_from_source,
206  const import_export::CopyParams& copy_params,
207  const ForeignServer* server) {
208  CHECK(is_valid_source_type(copy_params));
209 
210  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
211  auto foreign_table = std::make_unique<ForeignTable>();
212 
213  *static_cast<TableDescriptor*>(foreign_table.get()) =
214  *table; // copy table related values
215 
216  CHECK(server);
217  foreign_table->foreign_server = server;
218 
219  // populate options for regex filtering of file-paths in supported data types
223  if (copy_params.regex_path_filter.has_value()) {
225  copy_params.regex_path_filter.value();
226  }
227  if (copy_params.file_sort_order_by.has_value()) {
229  copy_params.file_sort_order_by.value();
230  }
231  if (copy_params.file_sort_regex.has_value()) {
233  copy_params.file_sort_regex.value();
234  }
235  }
236 
238  CHECK(!copy_params.line_regex.empty());
239  foreign_table->options[RegexFileBufferParser::LINE_REGEX_KEY] =
240  copy_params.line_regex;
241  if (!copy_params.line_start_regex.empty()) {
242  foreign_table->options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
243  copy_params.line_start_regex;
244  }
245  foreign_table->options[TextFileBufferParser::THREADS_KEY] =
246  std::to_string(copy_params.threads);
247  }
248 
249  // setup data source options based on various criteria
250  if (copy_params.source_type == import_export::SourceType::kOdbc) {
251  throw std::runtime_error("ODBC storage not supported");
252  } else if (is_s3_uri(copy_from_source)) {
253  throw std::runtime_error("AWS storage not supported");
254  } else {
255  foreign_table->options["FILE_PATH"] = copy_from_source;
256  }
257 
258  // for CSV import
260  foreign_table->options[CsvFileBufferParser::DELIMITER_KEY] = copy_params.delimiter;
261  foreign_table->options[CsvFileBufferParser::NULLS_KEY] = copy_params.null_str;
262  switch (copy_params.has_header) {
264  foreign_table->options[CsvFileBufferParser::HEADER_KEY] = "FALSE";
265  break;
268  foreign_table->options[CsvFileBufferParser::HEADER_KEY] = "TRUE";
269  break;
270  default:
271  CHECK(false);
272  }
273  foreign_table->options[CsvFileBufferParser::QUOTED_KEY] =
274  bool_to_option_value(copy_params.quoted);
275  foreign_table->options[CsvFileBufferParser::QUOTE_KEY] = copy_params.quote;
276  foreign_table->options[CsvFileBufferParser::ESCAPE_KEY] = copy_params.escape;
277  foreign_table->options[CsvFileBufferParser::LINE_DELIMITER_KEY] =
278  copy_params.line_delim;
279  foreign_table->options[CsvFileBufferParser::ARRAY_DELIMITER_KEY] =
280  copy_params.array_delim;
281  const std::array<char, 3> array_marker{
282  copy_params.array_begin, copy_params.array_end, 0};
283  foreign_table->options[CsvFileBufferParser::ARRAY_MARKER_KEY] = array_marker.data();
284  foreign_table->options[CsvFileBufferParser::LONLAT_KEY] =
285  bool_to_option_value(copy_params.lonlat);
286  foreign_table->options[CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY] =
288  if (copy_params.geo_explode_collections) {
289  throw std::runtime_error(
290  "geo_explode_collections is not yet supported for FSI CSV import");
291  }
292  foreign_table->options[CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY] =
294  foreign_table->options[CsvFileBufferParser::SOURCE_SRID_KEY] =
295  std::to_string(copy_params.source_srid);
296 
297  foreign_table->options[TextFileBufferParser::BUFFER_SIZE_KEY] =
298  std::to_string(copy_params.buffer_size);
299  foreign_table->options[TextFileBufferParser::THREADS_KEY] =
300  std::to_string(copy_params.threads);
301  }
302 
303  foreign_table->initializeOptions();
304  return foreign_table;
305 }
306 
307 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::create(
308  const std::string& data_wrapper_type,
309  const int db_id,
310  const ForeignTable* foreign_table) {
311  std::unique_ptr<ForeignDataWrapper> data_wrapper;
312  if (data_wrapper_type == DataWrapperType::CSV) {
313  if (CsvDataWrapper::validateAndGetIsS3Select(foreign_table)) {
314  UNREACHABLE();
315  } else {
316  data_wrapper = std::make_unique<CsvDataWrapper>(db_id, foreign_table);
317  }
318 #ifdef ENABLE_IMPORT_PARQUET
319  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
320  data_wrapper = std::make_unique<ParquetDataWrapper>(db_id, foreign_table);
321 #endif
322  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
323  data_wrapper = std::make_unique<RegexParserDataWrapper>(db_id, foreign_table);
324  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
325  data_wrapper = std::make_unique<InternalCatalogDataWrapper>(db_id, foreign_table);
326  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
327  data_wrapper = std::make_unique<InternalMemoryStatsDataWrapper>(db_id, foreign_table);
328  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
329  data_wrapper =
330  std::make_unique<InternalStorageStatsDataWrapper>(db_id, foreign_table);
331  } else {
332  throw std::runtime_error("Unsupported data wrapper");
333  }
334  return data_wrapper;
335 }
336 
338  const std::string& data_wrapper_type,
339  const ForeignTable* foreign_table) {
340  bool is_s3_select_wrapper{false};
341  std::string data_wrapper_type_key{data_wrapper_type};
342  constexpr const char* S3_SELECT_WRAPPER_KEY = "CSV_S3_SELECT";
343  if (foreign_table && data_wrapper_type == DataWrapperType::CSV &&
345  is_s3_select_wrapper = true;
346  data_wrapper_type_key = S3_SELECT_WRAPPER_KEY;
347  }
348 
349  if (validation_data_wrappers_.find(data_wrapper_type_key) ==
351  if (data_wrapper_type == DataWrapperType::CSV) {
352  if (is_s3_select_wrapper) {
353  UNREACHABLE();
354  } else {
355  validation_data_wrappers_[data_wrapper_type_key] =
356  std::make_unique<CsvDataWrapper>();
357  }
358 #ifdef ENABLE_IMPORT_PARQUET
359  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
360  validation_data_wrappers_[data_wrapper_type_key] =
361  std::make_unique<ParquetDataWrapper>();
362 #endif
363  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
364  validation_data_wrappers_[data_wrapper_type_key] =
365  std::make_unique<RegexParserDataWrapper>();
366  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
367  validation_data_wrappers_[data_wrapper_type_key] =
368  std::make_unique<InternalCatalogDataWrapper>();
369  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
370  validation_data_wrappers_[data_wrapper_type_key] =
371  std::make_unique<InternalMemoryStatsDataWrapper>();
372  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
373  validation_data_wrappers_[data_wrapper_type_key] =
374  std::make_unique<InternalStorageStatsDataWrapper>();
375  } else {
376  UNREACHABLE();
377  }
378  }
379  CHECK(validation_data_wrappers_.find(data_wrapper_type_key) !=
381  return *validation_data_wrappers_[data_wrapper_type_key];
382 }
383 
385  const std::string& data_wrapper_type) {
386  const auto& supported_wrapper_types = DataWrapperType::supported_data_wrapper_types;
387  if (std::find(supported_wrapper_types.begin(),
388  supported_wrapper_types.end(),
389  data_wrapper_type) == supported_wrapper_types.end()) {
390  std::vector<std::string_view> user_facing_wrapper_types;
391  for (const auto& type : supported_wrapper_types) {
393  user_facing_wrapper_types.emplace_back(type);
394  }
395  }
396  throw std::runtime_error{"Invalid data wrapper type \"" + data_wrapper_type +
397  "\". Data wrapper type must be one of the following: " +
398  join(user_facing_wrapper_types, ", ") + "."};
399  }
400 }
401 
402 std::map<std::string, std::unique_ptr<ForeignDataWrapper> >
404 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:196
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const ForeignDataWrapper & createForValidation(const std::string &data_wrapper_type, const ForeignTable *foreign_table=nullptr)
bool is_valid_data_wrapper(const std::string &data_wrapper_type)
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static constexpr std::array< char const *, 3 > INTERNAL_DATA_WRAPPERS
static constexpr char const * REGEX_PARSER
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
static const std::string ARRAY_MARKER_KEY
static constexpr char const * INTERNAL_STORAGE_STATS
std::string join(T const &container, std::string const &delim)
#define UNREACHABLE()
Definition: Logger.h:267
static bool validateAndGetIsS3Select(const ForeignTable *foreign_table)
static constexpr char const * INTERNAL_CATALOG
std::string to_string(char const *&&v)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::map< std::string, std::unique_ptr< ForeignDataWrapper > > validation_data_wrappers_
ImportHeaderRow has_header
Definition: CopyParams.h:46
static const std::string SOURCE_SRID_KEY
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:83
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::string bool_to_option_value(const bool value)
static void validateDataWrapperType(const std::string &data_wrapper_type)
static const std::string LINE_DELIMITER_KEY
import_export::SourceType source_type
Definition: CopyParams.h:57
bool is_valid_source_type(const import_export::CopyParams &copy_params)
bool is_s3_uri(const std::string &file_path)
static const std::string DELIMITER_KEY
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static constexpr std::array< std::string_view, 6 > supported_data_wrapper_types
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
static const std::string ARRAY_DELIMITER_KEY
std::string line_start_regex
Definition: CopyParams.h:104
static constexpr char const * INTERNAL_MEMORY_STATS
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:223
static constexpr char const * CSV
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:84
static constexpr char const * PARQUET
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:85
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)