OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataWrapperFactory.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "FsiJsonUtils.h"
19 
20 #include "CsvDataWrapper.h"
21 #include "ForeignDataWrapper.h"
26 #ifdef ENABLE_IMPORT_PARQUET
27 #include "ParquetDataWrapper.h"
28 #include "ParquetImporter.h"
29 #endif
30 #include "Catalog/os/UserMapping.h"
31 #include "RegexParserDataWrapper.h"
32 #include "Shared/SysDefinitions.h"
33 #include "Shared/misc.h"
34 #include "Shared/thread_count.h"
35 
36 namespace {
37 std::string get_data_wrapper_type(const import_export::CopyParams& copy_params) {
38  std::string data_wrapper_type;
40  data_wrapper_type = foreign_storage::DataWrapperType::CSV;
41  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
43 #ifdef ENABLE_IMPORT_PARQUET
44  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
46 #endif
47  } else {
48  UNREACHABLE();
49  }
50  return data_wrapper_type;
51 }
52 } // namespace
53 
54 namespace foreign_storage {
55 bool is_s3_uri(const std::string& file_path) {
56  const std::string s3_prefix = "s3://";
57  return file_path.find(s3_prefix) != std::string::npos;
58 }
59 
60 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
61  std::unique_ptr<foreign_storage::UserMapping>,
62  std::unique_ptr<foreign_storage::ForeignTable>>
63 create_proxy_fsi_objects(const std::string& copy_from_source,
64  const import_export::CopyParams& copy_params,
65  const int db_id,
66  const TableDescriptor* table,
67  const int32_t user_id) {
69  db_id, user_id, copy_from_source, copy_params);
70 
71  CHECK(server);
72  server->validate();
73 
74  auto user_mapping =
76  db_id, user_id, copy_from_source, copy_params, server.get());
77 
78  if (user_mapping) {
79  user_mapping->validate(server.get());
80  }
81 
82  auto foreign_table =
84  db_id, table, copy_from_source, copy_params, server.get());
85 
86  CHECK(foreign_table);
87  foreign_table->validateOptionValues();
88 
89  return {std::move(server), std::move(user_mapping), std::move(foreign_table)};
90 }
91 
92 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
93  std::unique_ptr<foreign_storage::UserMapping>,
94  std::unique_ptr<foreign_storage::ForeignTable>>
95 create_proxy_fsi_objects(const std::string& copy_from_source,
96  const import_export::CopyParams& copy_params,
97  const TableDescriptor* table) {
98  return create_proxy_fsi_objects(copy_from_source, copy_params, -1, table, -1);
99 }
100 
101 } // namespace foreign_storage
102 
103 namespace {
104 
105 bool is_valid_data_wrapper(const std::string& data_wrapper_type) {
106  return
107 #ifdef ENABLE_IMPORT_PARQUET
108  data_wrapper_type == foreign_storage::DataWrapperType::PARQUET ||
109 #endif
110  data_wrapper_type == foreign_storage::DataWrapperType::CSV ||
112 }
113 
114 } // namespace
115 
116 namespace foreign_storage {
117 
119  if (copy_params.line_regex.empty()) {
120  throw std::runtime_error{"Regex parser options must contain a line regex."};
121  }
122 }
123 
125  return
126 #ifdef ENABLE_IMPORT_PARQUET
128 #endif
131 }
132 
133 std::string bool_to_option_value(const bool value) {
134  return value ? "TRUE" : "FALSE";
135 }
136 
137 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForGeneralImport(
138  const import_export::CopyParams& copy_params,
139  const int db_id,
140  const ForeignTable* foreign_table,
141  const UserMapping* user_mapping) {
142  auto data_wrapper_type = get_data_wrapper_type(copy_params);
143  CHECK(is_valid_data_wrapper(data_wrapper_type));
144 
145  if (data_wrapper_type == DataWrapperType::CSV) {
146  return std::make_unique<CsvDataWrapper>(
147  db_id, foreign_table, user_mapping, /*disable_cache=*/true);
148  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
149  return std::make_unique<RegexParserDataWrapper>(
150  db_id, foreign_table, user_mapping, true);
151  }
152 #ifdef ENABLE_IMPORT_PARQUET
153  else if (data_wrapper_type == DataWrapperType::PARQUET) {
154  return std::make_unique<ParquetDataWrapper>(
155  db_id, foreign_table, /*do_metadata_stats_validation=*/false);
156  }
157 #endif
158 
159  return {};
160 }
161 
162 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForImport(
163  const std::string& data_wrapper_type,
164  const int db_id,
165  const ForeignTable* foreign_table,
166  const UserMapping* user_mapping) {
167 #ifdef ENABLE_IMPORT_PARQUET
168  // only supported for parquet import path currently
169  CHECK(data_wrapper_type == DataWrapperType::PARQUET);
170  return std::make_unique<ParquetImporter>(db_id, foreign_table, user_mapping);
171 #else
172  return {};
173 #endif
174 }
175 
176 std::unique_ptr<UserMapping>
178  const int db_id,
179  const int user_id,
180  const std::string& file_path,
181  const import_export::CopyParams& copy_params,
182  const ForeignServer* server) {
183  return {};
184 }
185 
187  const int db_id,
188  const int user_id,
189  const std::string& file_path,
190  const import_export::CopyParams& copy_params) {
191  CHECK(is_valid_source_type(copy_params));
192 
193  auto foreign_server = std::make_unique<foreign_storage::ForeignServer>();
194 
195  foreign_server->id = -1;
196  foreign_server->user_id = user_id;
198  foreign_server->data_wrapper_type = DataWrapperType::CSV;
199  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
200  foreign_server->data_wrapper_type = DataWrapperType::REGEX_PARSER;
201 #ifdef ENABLE_IMPORT_PARQUET
202  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
203  foreign_server->data_wrapper_type = DataWrapperType::PARQUET;
204 #endif
205  } else {
206  UNREACHABLE();
207  }
208  foreign_server->name = "import_proxy_server";
209 
210  if (copy_params.source_type == import_export::SourceType::kOdbc) {
211  throw std::runtime_error("ODBC storage not supported");
212  } else if (is_s3_uri(file_path)) {
213  throw std::runtime_error("AWS storage not supported");
214  } else {
215  foreign_server->options[AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY] =
217  }
218 
219  return foreign_server;
220 }
221 
222 namespace {
224  const import_export::ImportHeaderRow& has_header) {
225  switch (has_header) {
227  options[CsvFileBufferParser::HEADER_KEY] = "FALSE";
228  break;
231  options[CsvFileBufferParser::HEADER_KEY] = "TRUE";
232  break;
233  default:
234  CHECK(false);
235  }
236 }
237 } // namespace
238 
240  const int db_id,
241  const TableDescriptor* table,
242  const std::string& copy_from_source,
243  const import_export::CopyParams& copy_params,
244  const ForeignServer* server) {
245  CHECK(is_valid_source_type(copy_params));
246 
247  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
248  auto foreign_table = std::make_unique<ForeignTable>();
249 
250  *static_cast<TableDescriptor*>(foreign_table.get()) =
251  *table; // copy table related values
252 
253  CHECK(server);
254  foreign_table->foreign_server = server;
255 
256  // populate options for regex filtering of file-paths in supported data types
260  if (copy_params.regex_path_filter.has_value()) {
262  copy_params.regex_path_filter.value();
263  }
264  if (copy_params.file_sort_order_by.has_value()) {
266  copy_params.file_sort_order_by.value();
267  }
268  if (copy_params.file_sort_regex.has_value()) {
270  copy_params.file_sort_regex.value();
271  }
272  foreign_table->options[AbstractFileStorageDataWrapper::THREADS_KEY] =
274  }
275 
277  CHECK(!copy_params.line_regex.empty());
278  foreign_table->options[RegexFileBufferParser::LINE_REGEX_KEY] =
279  copy_params.line_regex;
280  if (!copy_params.line_start_regex.empty()) {
281  foreign_table->options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
282  copy_params.line_start_regex;
283  }
285  set_header_option(foreign_table->options, copy_params.has_header);
286  }
287  }
288 
289  // setup data source options based on various criteria
290  if (copy_params.source_type == import_export::SourceType::kOdbc) {
291  throw std::runtime_error("ODBC storage not supported");
292  } else if (is_s3_uri(copy_from_source)) {
293  throw std::runtime_error("AWS storage not supported");
294  } else {
295  foreign_table->options["FILE_PATH"] = copy_from_source;
296  }
297 
298  // for CSV import
300  foreign_table->options[CsvFileBufferParser::DELIMITER_KEY] = copy_params.delimiter;
301  foreign_table->options[CsvFileBufferParser::NULLS_KEY] = copy_params.null_str;
302  set_header_option(foreign_table->options, copy_params.has_header);
303  foreign_table->options[CsvFileBufferParser::QUOTED_KEY] =
304  bool_to_option_value(copy_params.quoted);
305  foreign_table->options[CsvFileBufferParser::QUOTE_KEY] = copy_params.quote;
306  foreign_table->options[CsvFileBufferParser::ESCAPE_KEY] = copy_params.escape;
307  foreign_table->options[CsvFileBufferParser::LINE_DELIMITER_KEY] =
308  copy_params.line_delim;
309  foreign_table->options[CsvFileBufferParser::ARRAY_DELIMITER_KEY] =
310  copy_params.array_delim;
311  const std::array<char, 3> array_marker{
312  copy_params.array_begin, copy_params.array_end, 0};
313  foreign_table->options[CsvFileBufferParser::ARRAY_MARKER_KEY] = array_marker.data();
314  foreign_table->options[CsvFileBufferParser::LONLAT_KEY] =
315  bool_to_option_value(copy_params.lonlat);
316  foreign_table->options[CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY] =
318  if (copy_params.geo_explode_collections) {
319  throw std::runtime_error(
320  "geo_explode_collections is not yet supported for FSI CSV import");
321  }
322  foreign_table->options[CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY] =
324  foreign_table->options[CsvFileBufferParser::SOURCE_SRID_KEY] =
325  std::to_string(copy_params.source_srid);
326 
327  foreign_table->options[TextFileBufferParser::BUFFER_SIZE_KEY] =
328  std::to_string(copy_params.buffer_size);
329 
330  foreign_table->options[CsvFileBufferParser::TRIM_SPACES_KEY] =
331  bool_to_option_value(copy_params.trim_spaces);
332  }
333 
334  foreign_table->initializeOptions();
335  return foreign_table;
336 }
337 
338 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::create(
339  const std::string& data_wrapper_type,
340  const int db_id,
341  const ForeignTable* foreign_table) {
342  std::unique_ptr<ForeignDataWrapper> data_wrapper;
343  if (data_wrapper_type == DataWrapperType::CSV) {
344  if (CsvDataWrapper::validateAndGetIsS3Select(foreign_table)) {
345  UNREACHABLE();
346  } else {
347  data_wrapper = std::make_unique<CsvDataWrapper>(db_id, foreign_table);
348  }
349 #ifdef ENABLE_IMPORT_PARQUET
350  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
351  data_wrapper = std::make_unique<ParquetDataWrapper>(db_id, foreign_table);
352 #endif
353  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
354  data_wrapper = std::make_unique<RegexParserDataWrapper>(db_id, foreign_table);
355  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
356  data_wrapper = std::make_unique<InternalCatalogDataWrapper>(db_id, foreign_table);
357  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
358  data_wrapper = std::make_unique<InternalMemoryStatsDataWrapper>(db_id, foreign_table);
359  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
360  data_wrapper =
361  std::make_unique<InternalStorageStatsDataWrapper>(db_id, foreign_table);
362  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
363  data_wrapper = std::make_unique<InternalLogsDataWrapper>(db_id, foreign_table);
364  } else {
365  throw std::runtime_error("Unsupported data wrapper");
366  }
367  return data_wrapper;
368 }
369 
371  const std::string& data_wrapper_type,
372  const ForeignTable* foreign_table) {
373  bool is_s3_select_wrapper{false};
374  std::string data_wrapper_type_key{data_wrapper_type};
375  constexpr const char* S3_SELECT_WRAPPER_KEY = "CSV_S3_SELECT";
376  if (foreign_table && data_wrapper_type == DataWrapperType::CSV &&
378  is_s3_select_wrapper = true;
379  data_wrapper_type_key = S3_SELECT_WRAPPER_KEY;
380  }
381 
382  if (validation_data_wrappers_.find(data_wrapper_type_key) ==
384  if (data_wrapper_type == DataWrapperType::CSV) {
385  if (is_s3_select_wrapper) {
386  UNREACHABLE();
387  } else {
388  validation_data_wrappers_[data_wrapper_type_key] =
389  std::make_unique<CsvDataWrapper>();
390  }
391 #ifdef ENABLE_IMPORT_PARQUET
392  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
393  validation_data_wrappers_[data_wrapper_type_key] =
394  std::make_unique<ParquetDataWrapper>();
395 #endif
396  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
397  validation_data_wrappers_[data_wrapper_type_key] =
398  std::make_unique<RegexParserDataWrapper>();
399  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
400  validation_data_wrappers_[data_wrapper_type_key] =
401  std::make_unique<InternalCatalogDataWrapper>();
402  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
403  validation_data_wrappers_[data_wrapper_type_key] =
404  std::make_unique<InternalMemoryStatsDataWrapper>();
405  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
406  validation_data_wrappers_[data_wrapper_type_key] =
407  std::make_unique<InternalStorageStatsDataWrapper>();
408  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
409  validation_data_wrappers_[data_wrapper_type_key] =
410  std::make_unique<InternalLogsDataWrapper>();
411  } else {
412  UNREACHABLE();
413  }
414  }
415  CHECK(validation_data_wrappers_.find(data_wrapper_type_key) !=
417  return *validation_data_wrappers_[data_wrapper_type_key];
418 }
419 
421  const std::string& data_wrapper_type) {
422  const auto& supported_wrapper_types = DataWrapperType::supported_data_wrapper_types;
423  if (std::find(supported_wrapper_types.begin(),
424  supported_wrapper_types.end(),
425  data_wrapper_type) == supported_wrapper_types.end()) {
426  std::vector<std::string_view> user_facing_wrapper_types;
427  for (const auto& type : supported_wrapper_types) {
429  user_facing_wrapper_types.emplace_back(type);
430  }
431  }
432  throw std::runtime_error{"Invalid data wrapper type \"" + data_wrapper_type +
433  "\". Data wrapper type must be one of the following: " +
434  join(user_facing_wrapper_types, ", ") + "."};
435  }
436 }
437 
438 std::map<std::string, std::unique_ptr<ForeignDataWrapper>>
440 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:195
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const ForeignDataWrapper & createForValidation(const std::string &data_wrapper_type, const ForeignTable *foreign_table=nullptr)
bool is_valid_data_wrapper(const std::string &data_wrapper_type)
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static constexpr char const * REGEX_PARSER
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static constexpr char const * INTERNAL_STORAGE_STATS
static constexpr std::array< std::string_view, 7 > supported_data_wrapper_types
std::string join(T const &container, std::string const &delim)
static constexpr std::array< char const *, 4 > INTERNAL_DATA_WRAPPERS
#define UNREACHABLE()
Definition: Logger.h:333
static bool validateAndGetIsS3Select(const ForeignTable *foreign_table)
void set_header_option(OptionsMap &options, const import_export::ImportHeaderRow &has_header)
static constexpr char const * INTERNAL_CATALOG
std::string to_string(char const *&&v)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::map< std::string, std::unique_ptr< ForeignDataWrapper > > validation_data_wrappers_
ImportHeaderRow has_header
Definition: CopyParams.h:46
static const std::string SOURCE_SRID_KEY
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::string bool_to_option_value(const bool value)
static void validateDataWrapperType(const std::string &data_wrapper_type)
std::string get_data_wrapper_type(const import_export::CopyParams &copy_params)
static const std::string LINE_DELIMITER_KEY
import_export::SourceType source_type
Definition: CopyParams.h:57
bool is_valid_source_type(const import_export::CopyParams &copy_params)
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
bool is_s3_uri(const std::string &file_path)
static const std::string DELIMITER_KEY
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
static const std::string ARRAY_DELIMITER_KEY
std::string line_start_regex
Definition: CopyParams.h:106
static constexpr char const * INTERNAL_MEMORY_STATS
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:289
static constexpr char const * CSV
std::map< std::string, std::string, std::less<>> OptionsMap
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
static constexpr char const * INTERNAL_LOGS
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)