OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataWrapperFactory.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "FsiJsonUtils.h"
19 
20 #include "CsvDataWrapper.h"
21 #include "ForeignDataWrapper.h"
26 #ifdef ENABLE_IMPORT_PARQUET
27 #include "ParquetDataWrapper.h"
28 #include "ParquetImporter.h"
29 #endif
30 #include "Catalog/os/UserMapping.h"
31 #include "RegexParserDataWrapper.h"
32 #include "Shared/SysDefinitions.h"
33 #include "Shared/misc.h"
34 #include "Shared/thread_count.h"
35 
36 namespace {
37 std::string get_data_wrapper_type(const import_export::CopyParams& copy_params) {
38  std::string data_wrapper_type;
40  data_wrapper_type = foreign_storage::DataWrapperType::CSV;
41  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
43 #ifdef ENABLE_IMPORT_PARQUET
44  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
46 #endif
47  } else {
48  UNREACHABLE();
49  }
50  return data_wrapper_type;
51 }
52 } // namespace
53 
54 namespace foreign_storage {
55 bool is_s3_uri(const std::string& file_path) {
56  const std::string s3_prefix = "s3://";
57  return file_path.find(s3_prefix) != std::string::npos;
58 }
59 
60 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
61  std::unique_ptr<foreign_storage::UserMapping>,
62  std::unique_ptr<foreign_storage::ForeignTable>>
63 create_proxy_fsi_objects(const std::string& copy_from_source,
64  const import_export::CopyParams& copy_params,
65  const int db_id,
66  const TableDescriptor* table,
67  const int32_t user_id) {
69  db_id, user_id, copy_from_source, copy_params);
70 
71  CHECK(server);
72  server->validate();
73 
74  auto user_mapping =
76  db_id, user_id, copy_from_source, copy_params, server.get());
77 
78  if (user_mapping) {
79  user_mapping->validate(server.get());
80  }
81 
82  auto foreign_table =
84  db_id, table, copy_from_source, copy_params, server.get());
85 
86  CHECK(foreign_table);
87  foreign_table->validateOptionValues();
88 
89  return {std::move(server), std::move(user_mapping), std::move(foreign_table)};
90 }
91 
92 std::tuple<std::unique_ptr<foreign_storage::ForeignServer>,
93  std::unique_ptr<foreign_storage::UserMapping>,
94  std::unique_ptr<foreign_storage::ForeignTable>>
95 create_proxy_fsi_objects(const std::string& copy_from_source,
96  const import_export::CopyParams& copy_params,
97  const TableDescriptor* table) {
98  return create_proxy_fsi_objects(copy_from_source, copy_params, -1, table, -1);
99 }
100 
101 } // namespace foreign_storage
102 
103 namespace {
104 
105 
106 bool is_valid_data_wrapper(const std::string& data_wrapper_type) {
107  return
108 #ifdef ENABLE_IMPORT_PARQUET
109  data_wrapper_type == foreign_storage::DataWrapperType::PARQUET ||
110 #endif
111  data_wrapper_type == foreign_storage::DataWrapperType::CSV ||
113 }
114 
115 } // namespace
116 
117 namespace foreign_storage {
118 
120  if (copy_params.line_regex.empty()) {
121  throw std::runtime_error{"Regex parser options must contain a line regex."};
122  }
123 }
124 
126  return
127 #ifdef ENABLE_IMPORT_PARQUET
129 #endif
132 }
133 
134 std::string bool_to_option_value(const bool value) {
135  return value ? "TRUE" : "FALSE";
136 }
137 
138 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForGeneralImport(
139  const import_export::CopyParams& copy_params,
140  const int db_id,
141  const ForeignTable* foreign_table,
142  const UserMapping* user_mapping) {
143  auto data_wrapper_type = get_data_wrapper_type(copy_params);
144  CHECK(is_valid_data_wrapper(data_wrapper_type));
145 
146  if (data_wrapper_type == DataWrapperType::CSV) {
147  return std::make_unique<CsvDataWrapper>(
148  db_id, foreign_table, user_mapping, /*disable_cache=*/true);
149  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
150  return std::make_unique<RegexParserDataWrapper>(
151  db_id, foreign_table, user_mapping, true);
152  }
153 #ifdef ENABLE_IMPORT_PARQUET
154  else if (data_wrapper_type == DataWrapperType::PARQUET) {
155  return std::make_unique<ParquetDataWrapper>(
156  db_id, foreign_table, /*do_metadata_stats_validation=*/false);
157  }
158 #endif
159 
160  return {};
161 }
162 
163 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::createForImport(
164  const std::string& data_wrapper_type,
165  const int db_id,
166  const ForeignTable* foreign_table,
167  const UserMapping* user_mapping) {
168 #ifdef ENABLE_IMPORT_PARQUET
169  // only supported for parquet import path currently
170  CHECK(data_wrapper_type == DataWrapperType::PARQUET);
171  return std::make_unique<ParquetImporter>(db_id, foreign_table, user_mapping);
172 #else
173  return {};
174 #endif
175 }
176 
177 std::unique_ptr<UserMapping>
179  const int db_id,
180  const int user_id,
181  const std::string& file_path,
182  const import_export::CopyParams& copy_params,
183  const ForeignServer* server) {
184  return {};
185 }
186 
188  const int db_id,
189  const int user_id,
190  const std::string& file_path,
191  const import_export::CopyParams& copy_params) {
192  CHECK(is_valid_source_type(copy_params));
193 
194  auto foreign_server = std::make_unique<foreign_storage::ForeignServer>();
195 
196  foreign_server->id = -1;
197  foreign_server->user_id = user_id;
199  foreign_server->data_wrapper_type = DataWrapperType::CSV;
200  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
201  foreign_server->data_wrapper_type = DataWrapperType::REGEX_PARSER;
202 #ifdef ENABLE_IMPORT_PARQUET
203  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
204  foreign_server->data_wrapper_type = DataWrapperType::PARQUET;
205 #endif
206  } else {
207  UNREACHABLE();
208  }
209  foreign_server->name = "import_proxy_server";
210 
211  if (copy_params.source_type == import_export::SourceType::kOdbc) {
212  throw std::runtime_error("ODBC storage not supported");
213  } else if (is_s3_uri(file_path)) {
214  throw std::runtime_error("AWS storage not supported");
215  } else {
216  foreign_server->options[AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY] =
218  }
219 
220  return foreign_server;
221 }
222 
223 namespace {
225  const import_export::ImportHeaderRow& has_header) {
226  switch (has_header) {
228  options[CsvFileBufferParser::HEADER_KEY] = "FALSE";
229  break;
232  options[CsvFileBufferParser::HEADER_KEY] = "TRUE";
233  break;
234  default:
235  CHECK(false);
236  }
237 }
238 } // namespace
239 
241  const int db_id,
242  const TableDescriptor* table,
243  const std::string& copy_from_source,
244  const import_export::CopyParams& copy_params,
245  const ForeignServer* server) {
246  CHECK(is_valid_source_type(copy_params));
247 
248  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
249  auto foreign_table = std::make_unique<ForeignTable>();
250 
251  *static_cast<TableDescriptor*>(foreign_table.get()) =
252  *table; // copy table related values
253 
254  CHECK(server);
255  foreign_table->foreign_server = server;
256 
257  // populate options for regex filtering of file-paths in supported data types
261  if (copy_params.regex_path_filter.has_value()) {
263  copy_params.regex_path_filter.value();
264  }
265  if (copy_params.file_sort_order_by.has_value()) {
267  copy_params.file_sort_order_by.value();
268  }
269  if (copy_params.file_sort_regex.has_value()) {
271  copy_params.file_sort_regex.value();
272  }
273  foreign_table->options[AbstractFileStorageDataWrapper::THREADS_KEY] =
275  }
276 
278  CHECK(!copy_params.line_regex.empty());
279  foreign_table->options[RegexFileBufferParser::LINE_REGEX_KEY] =
280  copy_params.line_regex;
281  if (!copy_params.line_start_regex.empty()) {
282  foreign_table->options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
283  copy_params.line_start_regex;
284  }
286  set_header_option(foreign_table->options, copy_params.has_header);
287  }
288  }
289 
290  // setup data source options based on various criteria
291  if (copy_params.source_type == import_export::SourceType::kOdbc) {
292  throw std::runtime_error("ODBC storage not supported");
293  } else if (is_s3_uri(copy_from_source)) {
294  throw std::runtime_error("AWS storage not supported");
295  } else {
296  foreign_table->options["FILE_PATH"] = copy_from_source;
297  }
298 
299  // for CSV import
301  foreign_table->options[CsvFileBufferParser::DELIMITER_KEY] = copy_params.delimiter;
302  foreign_table->options[CsvFileBufferParser::NULLS_KEY] = copy_params.null_str;
303  set_header_option(foreign_table->options, copy_params.has_header);
304  foreign_table->options[CsvFileBufferParser::QUOTED_KEY] =
305  bool_to_option_value(copy_params.quoted);
306  foreign_table->options[CsvFileBufferParser::QUOTE_KEY] = copy_params.quote;
307  foreign_table->options[CsvFileBufferParser::ESCAPE_KEY] = copy_params.escape;
308  foreign_table->options[CsvFileBufferParser::LINE_DELIMITER_KEY] =
309  copy_params.line_delim;
310  foreign_table->options[CsvFileBufferParser::ARRAY_DELIMITER_KEY] =
311  copy_params.array_delim;
312  const std::array<char, 3> array_marker{
313  copy_params.array_begin, copy_params.array_end, 0};
314  foreign_table->options[CsvFileBufferParser::ARRAY_MARKER_KEY] = array_marker.data();
315  foreign_table->options[CsvFileBufferParser::LONLAT_KEY] =
316  bool_to_option_value(copy_params.lonlat);
317  foreign_table->options[CsvFileBufferParser::GEO_ASSIGN_RENDER_GROUPS_KEY] =
319  if (copy_params.geo_explode_collections) {
320  throw std::runtime_error(
321  "geo_explode_collections is not yet supported for FSI CSV import");
322  }
323  foreign_table->options[CsvFileBufferParser::GEO_EXPLODE_COLLECTIONS_KEY] =
325  foreign_table->options[CsvFileBufferParser::SOURCE_SRID_KEY] =
326  std::to_string(copy_params.source_srid);
327 
328  foreign_table->options[TextFileBufferParser::BUFFER_SIZE_KEY] =
329  std::to_string(copy_params.buffer_size);
330 
331  foreign_table->options[CsvFileBufferParser::TRIM_SPACES_KEY] =
332  bool_to_option_value(copy_params.trim_spaces);
333  }
334 
335  foreign_table->initializeOptions();
336  return foreign_table;
337 }
338 
339 std::unique_ptr<ForeignDataWrapper> ForeignDataWrapperFactory::create(
340  const std::string& data_wrapper_type,
341  const int db_id,
342  const ForeignTable* foreign_table) {
343  std::unique_ptr<ForeignDataWrapper> data_wrapper;
344  if (data_wrapper_type == DataWrapperType::CSV) {
345  if (CsvDataWrapper::validateAndGetIsS3Select(foreign_table)) {
346  UNREACHABLE();
347  } else {
348  data_wrapper = std::make_unique<CsvDataWrapper>(db_id, foreign_table);
349  }
350 #ifdef ENABLE_IMPORT_PARQUET
351  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
352  data_wrapper = std::make_unique<ParquetDataWrapper>(db_id, foreign_table);
353 #endif
354  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
355  data_wrapper = std::make_unique<RegexParserDataWrapper>(db_id, foreign_table);
356  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
357  data_wrapper = std::make_unique<InternalCatalogDataWrapper>(db_id, foreign_table);
358  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
359  data_wrapper = std::make_unique<InternalMemoryStatsDataWrapper>(db_id, foreign_table);
360  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
361  data_wrapper =
362  std::make_unique<InternalStorageStatsDataWrapper>(db_id, foreign_table);
363  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
364  data_wrapper = std::make_unique<InternalLogsDataWrapper>(db_id, foreign_table);
365  } else {
366  throw std::runtime_error("Unsupported data wrapper");
367  }
368  return data_wrapper;
369 }
370 
372  const std::string& data_wrapper_type,
373  const ForeignTable* foreign_table) {
374  bool is_s3_select_wrapper{false};
375  std::string data_wrapper_type_key{data_wrapper_type};
376  constexpr const char* S3_SELECT_WRAPPER_KEY = "CSV_S3_SELECT";
377  if (foreign_table && data_wrapper_type == DataWrapperType::CSV &&
379  is_s3_select_wrapper = true;
380  data_wrapper_type_key = S3_SELECT_WRAPPER_KEY;
381  }
382 
383  if (validation_data_wrappers_.find(data_wrapper_type_key) ==
385  if (data_wrapper_type == DataWrapperType::CSV) {
386  if (is_s3_select_wrapper) {
387  UNREACHABLE();
388  } else {
389  validation_data_wrappers_[data_wrapper_type_key] =
390  std::make_unique<CsvDataWrapper>();
391  }
392 #ifdef ENABLE_IMPORT_PARQUET
393  } else if (data_wrapper_type == DataWrapperType::PARQUET) {
394  validation_data_wrappers_[data_wrapper_type_key] =
395  std::make_unique<ParquetDataWrapper>();
396 #endif
397  } else if (data_wrapper_type == DataWrapperType::REGEX_PARSER) {
398  validation_data_wrappers_[data_wrapper_type_key] =
399  std::make_unique<RegexParserDataWrapper>();
400  } else if (data_wrapper_type == DataWrapperType::INTERNAL_CATALOG) {
401  validation_data_wrappers_[data_wrapper_type_key] =
402  std::make_unique<InternalCatalogDataWrapper>();
403  } else if (data_wrapper_type == DataWrapperType::INTERNAL_MEMORY_STATS) {
404  validation_data_wrappers_[data_wrapper_type_key] =
405  std::make_unique<InternalMemoryStatsDataWrapper>();
406  } else if (data_wrapper_type == DataWrapperType::INTERNAL_STORAGE_STATS) {
407  validation_data_wrappers_[data_wrapper_type_key] =
408  std::make_unique<InternalStorageStatsDataWrapper>();
409  } else if (data_wrapper_type == DataWrapperType::INTERNAL_LOGS) {
410  validation_data_wrappers_[data_wrapper_type_key] =
411  std::make_unique<InternalLogsDataWrapper>();
412  } else {
413  UNREACHABLE();
414  }
415  }
416  CHECK(validation_data_wrappers_.find(data_wrapper_type_key) !=
418  return *validation_data_wrappers_[data_wrapper_type_key];
419 }
420 
422  const std::string& data_wrapper_type) {
423  const auto& supported_wrapper_types = DataWrapperType::supported_data_wrapper_types;
424  if (std::find(supported_wrapper_types.begin(),
425  supported_wrapper_types.end(),
426  data_wrapper_type) == supported_wrapper_types.end()) {
427  std::vector<std::string_view> user_facing_wrapper_types;
428  for (const auto& type : supported_wrapper_types) {
430  user_facing_wrapper_types.emplace_back(type);
431  }
432  }
433  throw std::runtime_error{"Invalid data wrapper type \"" + data_wrapper_type +
434  "\". Data wrapper type must be one of the following: " +
435  join(user_facing_wrapper_types, ", ") + "."};
436  }
437 }
438 
439 std::map<std::string, std::unique_ptr<ForeignDataWrapper> >
441 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:195
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
static const ForeignDataWrapper & createForValidation(const std::string &data_wrapper_type, const ForeignTable *foreign_table=nullptr)
bool is_valid_data_wrapper(const std::string &data_wrapper_type)
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static constexpr char const * REGEX_PARSER
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static constexpr char const * INTERNAL_STORAGE_STATS
static constexpr std::array< std::string_view, 7 > supported_data_wrapper_types
std::string join(T const &container, std::string const &delim)
static constexpr std::array< char const *, 4 > INTERNAL_DATA_WRAPPERS
#define UNREACHABLE()
Definition: Logger.h:266
static bool validateAndGetIsS3Select(const ForeignTable *foreign_table)
void set_header_option(OptionsMap &options, const import_export::ImportHeaderRow &has_header)
static constexpr char const * INTERNAL_CATALOG
std::string to_string(char const *&&v)
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
static std::map< std::string, std::unique_ptr< ForeignDataWrapper > > validation_data_wrappers_
ImportHeaderRow has_header
Definition: CopyParams.h:46
static const std::string SOURCE_SRID_KEY
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::string bool_to_option_value(const bool value)
static void validateDataWrapperType(const std::string &data_wrapper_type)
std::string get_data_wrapper_type(const import_export::CopyParams &copy_params)
static const std::string LINE_DELIMITER_KEY
import_export::SourceType source_type
Definition: CopyParams.h:57
bool is_valid_source_type(const import_export::CopyParams &copy_params)
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
bool is_s3_uri(const std::string &file_path)
static const std::string DELIMITER_KEY
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
static const std::string ARRAY_DELIMITER_KEY
std::string line_start_regex
Definition: CopyParams.h:106
static constexpr char const * INTERNAL_MEMORY_STATS
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHECK(condition)
Definition: Logger.h:222
static constexpr char const * CSV
std::map< std::string, std::string, std::less<>> OptionsMap
static const std::string GEO_ASSIGN_RENDER_GROUPS_KEY
static constexpr char const * INTERNAL_LOGS
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)