OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataWrapperFactory.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "CsvDataWrapper.h"
20 #include "ForeignDataWrapper.h"
27 #ifdef ENABLE_IMPORT_PARQUET
28 #include "ParquetDataWrapper.h"
29 #include "ParquetImporter.h"
30 #endif
31 #include "Catalog/os/UserMapping.h"
32 #include "RegexParserDataWrapper.h"
33 #include "Shared/JsonUtils.h"
34 #include "Shared/SysDefinitions.h"
35 #include "Shared/file_path_util.h"
36 #include "Shared/misc.h"
37 #include "Shared/thread_count.h"
38 
39 namespace {
40 std::string get_data_wrapper_type(const import_export::CopyParams& copy_params) {
41  std::string data_wrapper_type;
43  data_wrapper_type = foreign_storage::DataWrapperType::CSV;
44  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
46 #ifdef ENABLE_IMPORT_PARQUET
47  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
49 #endif
50  } else {
51  UNREACHABLE();
52  }
53  return data_wrapper_type;
54 }
55 } // namespace
56 
57 namespace foreign_storage {
58 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
59  std::unique_ptr<foreign_storage::UserMapping>,
60  std::unique_ptr<foreign_storage::ForeignTable>>
61 create_proxy_fsi_objects(const std::string& copy_from_source,
62  const import_export::CopyParams& copy_params,
63  const int db_id,
64  const TableDescriptor* table,
65  const int32_t user_id) {
67  db_id, user_id, copy_from_source, copy_params);
68 
69  CHECK(server);
70  server->validate();
71 
72  auto user_mapping =
74  db_id, user_id, copy_from_source, copy_params, server.get());
75 
76  if (user_mapping) {
77  user_mapping->validate(server.get());
78  }
79 
80  auto foreign_table =
82  db_id, table, copy_from_source, copy_params, server.get());
83 
84  CHECK(foreign_table);
85  foreign_table->validateOptionValues();
86 
87  return {std::move(server), std::move(user_mapping), std::move(foreign_table)};
88 }
89 
90 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
91  std::unique_ptr<foreign_storage::UserMapping>,
92  std::unique_ptr<foreign_storage::ForeignTable>>
93 create_proxy_fsi_objects(const std::string& copy_from_source,
94  const import_export::CopyParams& copy_params,
95  const TableDescriptor* table) {
96  return create_proxy_fsi_objects(copy_from_source, copy_params, -1, table, -1);
97 }
98 
99 } // namespace foreign_storage
100 
101 namespace {
102 
103 bool is_valid_data_wrapper(const std::string& data_wrapper_type) {
104  return
105 #ifdef ENABLE_IMPORT_PARQUET
106  data_wrapper_type == foreign_storage::DataWrapperType::PARQUET ||
107 #endif
108  data_wrapper_type == foreign_storage::DataWrapperType::CSV ||
110 }
111 
112 } // namespace
113 
114 namespace foreign_storage {
115 
117  if (copy_params.line_regex.empty()) {
118  throw std::runtime_error{"Regex parser options must contain a line regex."};
119  }
120 }
121 
123  return
124 #ifdef ENABLE_IMPORT_PARQUET
126 #endif
129 }
130 
131 std::string bool_to_option_value(const bool value) {
132  return value ? "TRUE" : "FALSE";
133 }
134 
135 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForGeneralImport(
136  const import_export::CopyParams& copy_params,
137  const int db_id,
138  const ForeignTable* foreign_table,
139  const UserMapping* user_mapping) {
140  auto data_wrapper_type = get_data_wrapper_type(copy_params);
141  CHECK(is_valid_data_wrapper(data_wrapper_type));
142 
143  if (data_wrapper_type == DataWrapperType::CSV) {
144  return std::make_unique<CsvDataWrapper>(
145  db_id, foreign_table, user_mapping, /*disable_cache=*/true);
146  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
147  return std::make_unique<RegexParserDataWrapper>(
148  db_id, foreign_table, user_mapping, true);
149  }
150 #ifdef ENABLE_IMPORT_PARQUET
151  else if (data_wrapper_type == DataWrapperType::PARQUET) {
152  return std::make_unique<ParquetDataWrapper>(
153  db_id, foreign_table, /*do_metadata_stats_validation=*/false);
154  }
155 #endif
156 
157  return {};
158 }
159 
160 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForImport(
161  const std::string& data_wrapper_type,
162  const int db_id,
163  const ForeignTable* foreign_table,
164  const UserMapping* user_mapping) {
165 #ifdef ENABLE_IMPORT_PARQUET
166  // only supported for parquet import path currently
167  CHECK(data_wrapper_type == DataWrapperType::PARQUET);
168  return std::make_unique<ParquetImporter>(db_id, foreign_table, user_mapping);
169 #else
170  return {};
171 #endif
172 }
173 
174 std::unique_ptr<UserMapping>
176  const int db_id,
177  const int user_id,
178  const std::string& file_path,
179  const import_export::CopyParams& copy_params,
180  const ForeignServer* server) {
181  return {};
182 }
183 
185  const int db_id,
186  const int user_id,
187  const std::string& file_path,
188  const import_export::CopyParams& copy_params) {
189  CHECK(is_valid_source_type(copy_params));
190 
191  auto foreign_server = std::make_unique<foreign_storage::ForeignServer>();
192 
193  foreign_server->id = -1;
194  foreign_server->user_id = user_id;
196  foreign_server->data_wrapper_type = DataWrapperType::CSV;
197  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
198  foreign_server->data_wrapper_type = DataWrapperType::REGEX_PARSER;
199 #ifdef ENABLE_IMPORT_PARQUET
200  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
201  foreign_server->data_wrapper_type = DataWrapperType::PARQUET;
202 #endif
203  } else {
204  UNREACHABLE();
205  }
206  foreign_server->name = "import_proxy_server";
207 
208  if (copy_params.source_type == import_export::SourceType::kOdbc) {
209  throw std::runtime_error("ODBC storage not supported");
210  } else if (shared::is_s3_uri(file_path)) {
211  throw std::runtime_error("AWS storage not supported");
212  } else {
213  foreign_server->options[AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY] =
215  }
216 
217  return foreign_server;
218 }
219 
220 namespace {
222  const import_export::ImportHeaderRow& has_header) {
223  switch (has_header) {
225  options[CsvFileBufferParser::HEADER_KEY] = "FALSE";
226  break;
229  options[CsvFileBufferParser::HEADER_KEY] = "TRUE";
230  break;
231  default:
232  CHECK(false);
233  }
234 }
235 } // namespace
236 
238  const int db_id,
239  const TableDescriptor* table,
240  const std::string& copy_from_source,
241  const import_export::CopyParams& copy_params,
242  const ForeignServer* server) {
243  CHECK(is_valid_source_type(copy_params));
244 
245  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
246  auto foreign_table = std::make_unique<ForeignTable>();
247 
248  *static_cast<TableDescriptor*>(foreign_table.get()) =
249  *table; // copy table related values
250 
251  CHECK(server);
252  foreign_table->foreign_server = server;
253 
254  // enable geo validation in most/all source types
261  }
262 
263  // populate options for regex filtering of file-paths in supported data types
267  if (copy_params.regex_path_filter.has_value()) {
269  copy_params.regex_path_filter.value();
270  }
271  if (copy_params.file_sort_order_by.has_value()) {
273  copy_params.file_sort_order_by.value();
274  }
275  if (copy_params.file_sort_regex.has_value()) {
277  copy_params.file_sort_regex.value();
278  }
279  foreign_table->options[AbstractFileStorageDataWrapper::THREADS_KEY] =
281  }
282 
284  CHECK(!copy_params.line_regex.empty());
285  foreign_table->options[RegexFileBufferParser::LINE_REGEX_KEY] =
286  copy_params.line_regex;
287  if (!copy_params.line_start_regex.empty()) {
288  foreign_table->options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
289  copy_params.line_start_regex;
290  }
292  set_header_option(foreign_table->options, copy_params.has_header);
293  }
294  }
295 
296  // setup data source options based on various criteria
297  if (copy_params.source_type == import_export::SourceType::kOdbc) {
298  throw std::runtime_error("ODBC storage not supported");
299  } else if (shared::is_s3_uri(copy_from_source)) {
300  throw std::runtime_error("AWS storage not supported");
301  } else {
302  foreign_table->options["FILE_PATH"] = copy_from_source;
303  }
304 
305  // for CSV import
307  foreign_table->options[CsvFileBufferParser::DELIMITER_KEY] = copy_params.delimiter;
308  foreign_table->options[CsvFileBufferParser::NULLS_KEY] = copy_params.null_str;
309  set_header_option(foreign_table->options, copy_params.has_header);
310  foreign_table->options[CsvFileBufferParser::QUOTED_KEY] =
311  bool_to_option_value(copy_params.quoted);
312  foreign_table->options[CsvFileBufferParser::QUOTE_KEY] = copy_params.quote;
313  foreign_table->options[CsvFileBufferParser::ESCAPE_KEY] = copy_params.escape;
314  foreign_table->options[CsvFileBufferParser::LINE_DELIMITER_KEY] =
315  copy_params.line_delim;
316  foreign_table->options[CsvFileBufferParser::ARRAY_DELIMITER_KEY] =
317  copy_params.array_delim;
318  const std::array<char, 3> array_marker{
319  copy_params.array_begin, copy_params.array_end, 0};
320  foreign_table->options[CsvFileBufferParser::ARRAY_MARKER_KEY] = array_marker.data();
321  foreign_table->options[CsvFileBufferParser::LONLAT_KEY] =
322  bool_to_option_value(copy_params.lonlat);
323  if (copy_params.geo_explode_collections) {
324  throw std::runtime_error(
325  "geo_explode_collections is not yet supported for FSI CSV import");
326  }
327  foreign_table->options[CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY] =
329  foreign_table->options[CsvFileBufferParser::SOURCE_SRID_KEY] =
330  std::to_string(copy_params.source_srid);
331 
332  foreign_table->options[TextFileBufferParser::BUFFER_SIZE_KEY] =
333  std::to_string(copy_params.buffer_size);
334 
335  foreign_table->options[CsvFileBufferParser::TRIM_SPACES_KEY] =
336  bool_to_option_value(copy_params.trim_spaces);
337  }
338 
339  foreign_table->initializeOptions();
340  return foreign_table;
341 }
342 
343 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::create(
344  const std::string& data_wrapper_type,
345  const int db_id,
346  const ForeignTable* foreign_table) {
347  std::unique_ptr<ForeignDataWrapper> data_wrapper;
348  if (data_wrapper_type == DataWrapperType::CSV) {
349  if (CsvDataWrapper::validateAndGetIsS3Select(foreign_table)) {
350  UNREACHABLE();
351  } else {
352  data_wrapper = std::make_unique<CsvDataWrapper>(db_id, foreign_table);
353  }
354 #ifdef ENABLE_IMPORT_PARQUET
355  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
356  data_wrapper = std::make_unique<ParquetDataWrapper>(db_id, foreign_table);
357 #endif
358  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
359  data_wrapper = std::make_unique<RegexParserDataWrapper>(db_id, foreign_table);
360  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
361  data_wrapper = std::make_unique<InternalCatalogDataWrapper>(db_id, foreign_table);
362  } else if (data_wrapper_type == DataWrapperType::INTERNAL_EXECUTOR_STATS) {
363  data_wrapper =
364  std::make_unique<InternalExecutorStatsDataWrapper>(db_id, foreign_table);
365  } else if (data_wrapper_type == DataWrapperType::INTERNAL_ML_MODEL_METADATA) {
366  data_wrapper =
367  std::make_unique<InternalMLModelMetadataDataWrapper>(db_id, foreign_table);
368  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
369  data_wrapper = std::make_unique<InternalMemoryStatsDataWrapper>(db_id, foreign_table);
370  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
371  data_wrapper =
372  std::make_unique<InternalStorageStatsDataWrapper>(db_id, foreign_table);
373  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
374  data_wrapper = std::make_unique<InternalLogsDataWrapper>(db_id, foreign_table);
375  } else {
376  throw std::runtime_error("Unsupported data wrapper");
377  }
378  return data_wrapper;
379 }
380 
382  const std::string& data_wrapper_type,
383  const ForeignTable* foreign_table) {
384  bool is_s3_select_wrapper{false};
385  std::string data_wrapper_type_key{data_wrapper_type};
386  constexpr const char* S3_SELECT_WRAPPER_KEY = "CSV_S3_SELECT";
387  if (foreign_table && data_wrapper_type == DataWrapperType::CSV &&
389  is_s3_select_wrapper = true;
390  data_wrapper_type_key = S3_SELECT_WRAPPER_KEY;
391  }
392 
393  auto [itr, is_new] = validation_data_wrappers_.emplace(data_wrapper_type_key, nullptr);
394  if (is_new) {
395  if (data_wrapper_type == DataWrapperType::CSV) {
396  if (is_s3_select_wrapper) {
397  UNREACHABLE();
398  } else {
399  itr->second = std::make_unique<CsvDataWrapper>();
400  }
401 #ifdef ENABLE_IMPORT_PARQUET
402  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
403  itr->second = std::make_unique<ParquetDataWrapper>();
404 #endif
405  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
406  itr->second = std::make_unique<RegexParserDataWrapper>();
407  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
408  itr->second = std::make_unique<InternalCatalogDataWrapper>();
409  } else if (data_wrapper_type == DataWrapperType::INTERNAL_EXECUTOR_STATS) {
410  itr->second = std::make_unique<InternalExecutorStatsDataWrapper>();
411  } else if (data_wrapper_type == DataWrapperType::INTERNAL_ML_MODEL_METADATA) {
412  itr->second = std::make_unique<InternalMLModelMetadataDataWrapper>();
413  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
414  itr->second = std::make_unique<InternalMemoryStatsDataWrapper>();
415  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
416  itr->second = std::make_unique<InternalStorageStatsDataWrapper>();
417  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
418  itr->second = std::make_unique<InternalLogsDataWrapper>();
419  } else {
420  UNREACHABLE();
421  }
422  }
423  return itr->second.get();
424 }
425 
427  const std::string& data_wrapper_type) {
428  const auto& supported_wrapper_types = DataWrapperType::supported_data_wrapper_types;
429  if (std::find(supported_wrapper_types.begin(),
430  supported_wrapper_types.end(),
431  data_wrapper_type) == supported_wrapper_types.end()) {
432  std::vector<std::string_view> user_facing_wrapper_types;
433  for (const auto& type : supported_wrapper_types) {
435  user_facing_wrapper_types.emplace_back(type);
436  }
437  }
438  throw std::runtime_error{"Invalid data wrapper type \"" + data_wrapper_type +
439  "\". Data wrapper type must be one of the following: " +
440  join(user_facing_wrapper_types, ", ") + "."};
441  }
442 }
443 
444 std::map<std::string, std::unique_ptr<ForeignDataWrapper>>
446 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:195
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
bool is_valid_data_wrapper(const std::string &data_wrapper_type)
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static constexpr char const * REGEX_PARSER
static const ForeignDataWrapper * createForValidation(const std::string &data_wrapper_type, const ForeignTable *foreign_table=nullptr)
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
bool is_s3_uri(const std::string &file_path)
static constexpr char const * INTERNAL_STORAGE_STATS
std::string join(T const &container, std::string const &delim)
#define UNREACHABLE()
Definition: Logger.h:338
static bool validateAndGetIsS3Select(const ForeignTable *foreign_table)
static constexpr std::array< char const *, 6 > INTERNAL_DATA_WRAPPERS
void set_header_option(OptionsMap &options, const import_export::ImportHeaderRow &has_header)
static constexpr char const * INTERNAL_CATALOG
std::string to_string(char const *&&v)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::map< std::string, std::unique_ptr< ForeignDataWrapper > > validation_data_wrappers_
ImportHeaderRow has_header
Definition: CopyParams.h:46
static const std::string SOURCE_SRID_KEY
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static SysCatalog & instance()
Definition: SysCatalog.h:343
static constexpr char const * INTERNAL_ML_MODEL_METADATA
std::string bool_to_option_value(const bool value)
static void validateDataWrapperType(const std::string &data_wrapper_type)
std::string get_data_wrapper_type(const import_export::CopyParams &copy_params)
static const std::string LINE_DELIMITER_KEY
import_export::SourceType source_type
Definition: CopyParams.h:57
bool is_valid_source_type(const import_export::CopyParams &copy_params)
static constexpr std::array< std::string_view, 9 > supported_data_wrapper_types
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
static constexpr char const * INTERNAL_EXECUTOR_STATS
static const std::string DELIMITER_KEY
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
static const std::string ARRAY_DELIMITER_KEY
std::string line_start_regex
Definition: CopyParams.h:106
static constexpr char const * INTERNAL_MEMORY_STATS
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:291
static constexpr char const * CSV
std::map< std::string, std::string, std::less<>> OptionsMap
static constexpr char const * INTERNAL_LOGS
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)