OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
18 
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
21 #include <filesystem>
22 
23 #include "Archive/S3Archive.h"
28 #include "Importer.h"
29 #include "Parser/ParserNode.h"
31 #include "Shared/measure.h"
32 #include "Shared/misc.h"
33 #include "Shared/scope.h"
34 #include "UserMapping.h"
35 
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
39 #endif
40 extern bool g_enable_fsi_regex_import;
41 
42 namespace {
43 
44 std::string get_data_wrapper_type(const import_export::CopyParams& copy_params) {
45  std::string data_wrapper_type;
47  data_wrapper_type = foreign_storage::DataWrapperType::CSV;
48  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
50 #ifdef ENABLE_IMPORT_PARQUET
51  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
53 #endif
54  } else {
55  UNREACHABLE();
56  }
57  return data_wrapper_type;
58 }
59 
61  foreign_storage::ForeignTable* foreign_table) {
62  ChunkMetadataVector metadata_vector;
63  try {
64  data_wrapper->populateChunkMetadata(
65  metadata_vector); // explicitly invoke a metadata scan on data wrapper
67  metadata_scan_exception) {
68  // if a metadata scan exception is thrown, check to see if we can adjust
69  // the fragment size and retry
70 
71  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
72  if (min_feasible_fragment_size < 0) {
73  throw; // no valid fragment size returned by exception
74  }
75  foreign_table->maxFragRows = min_feasible_fragment_size;
76  data_wrapper->populateChunkMetadata(
77  metadata_vector); // attempt another metadata scan, note, we assume that the
78  // metadata scan can be reentered safely after throwing the
79  // exception
80  }
81  return metadata_vector;
82 }
83 
87  }
88 }
89 
91  const ChunkMetadataVector& metadata_vector,
94  const TableDescriptor* table,
96  const Catalog_Namespace::SessionInfo* session_info,
97  const import_export::CopyParams& copy_params,
98  const std::string& copy_from_source) {
99  int32_t max_fragment_id = -1;
100  for (const auto& [key, _] : metadata_vector) {
101  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
102  }
103  CHECK_GE(max_fragment_id, 0);
104 
106  // if render group assignment is enabled, tell the wrapper to create any
107  // RenderGroupAnalyzers it may need for any poly columns in the table, if
108  // that wrapper type supports it
109  data_wrapper->createRenderGroupAnalyzers();
110  }
111 
112  import_export::ImportStatus import_status;
113  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
114 
115  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
116  // gather applicable keys to load for fragment
117  std::set<ChunkKey> fragment_keys;
118  for (const auto& [key, _] : metadata_vector) {
119  if (key[CHUNK_KEY_FRAGMENT_IDX] == fragment_id) {
120  fragment_keys.insert(key);
121 
122  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
123  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
124  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
125  if (col_desc->columnType.is_varlen_indeed()) {
126  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX);
127  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data chunk
128  auto index_key = key;
129  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
130  fragment_keys.insert(index_key);
131  }
132  }
133  }
134  }
135 
136  // create buffers
137  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
138  fragment_buffers_owner;
139  foreign_storage::ChunkToBufferMap fragment_buffers;
140  auto delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
141  for (const auto& key : fragment_keys) {
142  fragment_buffers_owner[key] =
143  std::make_unique<foreign_storage::ForeignStorageBuffer>();
144  fragment_buffers_owner[key]->resetToEmpty();
145  fragment_buffers[key] = shared::get_from_map(fragment_buffers_owner, key).get();
146  }
147 
148  // get chunks for import
150  table->tableId, catalog.getDatabaseId(), {}, {}};
151 
152  // get the buffers
153  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
154 
155  // create chunks from buffers
156  for (const auto& [key, buffer] : fragment_buffers) {
157  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
158  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
159  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
160 
161  if (col_desc->columnType.is_varlen_indeed()) {
162  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
163  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
164  auto index_key = key;
165  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
166  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
167  col_desc, buffer, shared::get_from_map(fragment_buffers, index_key), false);
168  }
169  } else { // regular non-varlen case with no index buffer
170  insert_chunks.chunks[col_id] =
171  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
172  }
173  }
174 
175  // mark which row indices are valid for import
176  auto row_count = fragment_buffers.begin()
177  ->second->getEncoder()
178  ->getNumElems(); // asssume all chunks have same number of rows,
179  // this is validated at a lower level
180  insert_chunks.valid_row_indices.reserve(row_count);
181  for (size_t irow = 0; irow < row_count; ++irow) {
182  if (delete_buffer->size() > 0) {
183  CHECK_LE(irow, delete_buffer->size());
184  if (delete_buffer->getMemoryPtr()[irow]) {
185  continue;
186  }
187  }
188  insert_chunks.valid_row_indices.emplace_back(irow);
189  }
190 
191  // import chunks
192  insert_data_loader.insertChunks(*session_info, insert_chunks);
193 
194  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
195  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
196  import_status.rows_completed += insert_chunks.valid_row_indices.size();
197  if (import_status.rows_rejected > copy_params.max_reject) {
198  import_status.load_failed = true;
199  import_status.load_msg = "Load was cancelled due to max reject rows being reached";
200  import_export::Importer::set_import_status(copy_from_source, import_status);
201  break;
202  }
203  import_export::Importer::set_import_status(copy_from_source, import_status);
204  }
205  return import_status;
206 }
207 
208 #ifdef HAVE_AWS_S3
209 struct DownloadedObjectToProcess {
210  std::string object_key;
211  std::atomic<bool> is_downloaded;
212  std::string download_file_path;
213  std::string import_file_path;
214 };
215 
216 size_t get_number_of_digits(const size_t number) {
217  return std::to_string(number).length();
218 }
219 
220 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
221  const import_export::CopyParams& s3_copy_params,
222  std::vector<DownloadedObjectToProcess>& objects_to_process,
223  const size_t begin_object_index,
224  const size_t end_object_index) {
225  import_export::CopyParams local_copy_params = s3_copy_params;
226  // remove any members from `local_copy_params` that are only intended to be used at a
227  // higher level
228  local_copy_params.s3_access_key.clear();
229  local_copy_params.s3_secret_key.clear();
230  local_copy_params.s3_session_token.clear();
231  local_copy_params.s3_region.clear();
232  local_copy_params.s3_endpoint.clear();
233 
234  local_copy_params.regex_path_filter = std::nullopt;
235  local_copy_params.file_sort_order_by = "PATHNAME"; // see comment below
236  local_copy_params.file_sort_regex = std::nullopt;
237 
238  CHECK_GT(end_object_index, begin_object_index);
239  CHECK_LT(begin_object_index, objects_to_process.size());
240 
241  size_t num_objects = end_object_index - begin_object_index;
242  auto& first_object = objects_to_process[begin_object_index];
243  std::string first_path = first_object.download_file_path;
244  std::string temp_dir = first_path + "_import";
245 
246  if (!std::filesystem::create_directory(temp_dir)) {
247  throw std::runtime_error("failed to create temporary directory for import: " +
248  temp_dir);
249  }
250 
251  // construct a directory with files to import
252  //
253  // NOTE:
254  // * files are moved into `temp_dir` in the exact order that they appear in
255  // `objects_to_process`
256  //
257  // * the `PATHNAME` option is set for `file_sort_order_by` in order to
258  // guarantee that import occurs in the order specified by user, provided the
259  // data wrapper correctly supports the `PATHNAME` option
260  //
261  // * filenames are chosen such that they appear in lexicographical order by
262  // pathname, thus require padding by appropriate number of zeros
263  std::filesystem::path temp_dir_path{temp_dir};
264  size_t counter = 0;
265  size_t num_zero = get_number_of_digits(num_objects);
266  for (size_t i = begin_object_index; i < end_object_index; ++i) {
267  auto& object = objects_to_process[i];
268  std::filesystem::path old_path = object.download_file_path;
269  auto counter_str = std::to_string(counter++);
270  auto zero_padded_counter_str =
271  std::string(num_zero - counter_str.length(), '0') + counter_str;
272  auto new_path = (temp_dir_path / zero_padded_counter_str).string();
273  std::filesystem::rename(old_path, new_path);
274  object.import_file_path = new_path;
275  }
276  return {temp_dir, local_copy_params};
277 }
278 #endif
279 
280 } // namespace
281 
282 namespace import_export {
283 
284 ForeignDataImporter::ForeignDataImporter(const std::string& copy_from_source,
285  const CopyParams& copy_params,
286  const TableDescriptor* table)
287  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
288  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
289 }
290 
292  const Catalog_Namespace::SessionInfo& parent_session_info,
293  ImportStatus& import_status,
294  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
295  string_dictionaries) {
296  if (table_->persistenceLevel ==
297  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
298  // tables
299  if (!import_status.load_failed) {
300  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
301  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
302  if (!string_dictionary->checkpoint()) {
303  LOG(ERROR) << "Checkpointing Dictionary for Column "
304  << column_desciptor->columnName << " failed.";
305  import_status.load_failed = true;
306  import_status.load_msg = "Dictionary checkpoint failed";
307  break;
308  }
309  }
310  }
311  }
312  if (import_status.load_failed) {
313  connector_->rollback(parent_session_info, table_->tableId);
314  } else {
315  connector_->checkpoint(parent_session_info, table_->tableId);
316  }
317 }
318 
320  const Catalog_Namespace::SessionInfo& parent_session_info,
321  ImportStatus& import_status,
322  const int32_t table_id) {
323  auto& catalog = parent_session_info.getCatalog();
324 
325  auto logical_columns =
326  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
327 
328  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
329  for (const auto& column_descriptor : logical_columns) {
330  if (!column_descriptor->columnType.is_dict_encoded_string()) {
331  continue;
332  }
333  auto dict_descriptor =
334  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
335  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
336  }
337 
338  finalize(parent_session_info, import_status, string_dictionaries);
339 }
340 
341 // TODO: the `proxy_foreign_table_fragment_size_` parameter controls the amount
342 // of data buffered in memory while importing using the `ForeignDataImporter`
343 // may need to be tuned or exposed as configurable parameter
345 
347  const Catalog_Namespace::SessionInfo* session_info,
348  const std::string& copy_from_source,
349  const CopyParams& copy_params) {
350  auto& catalog = session_info->getCatalog();
351 
353 
354  // validate copy params before import in order to print user friendly messages
355  validate_copy_params(copy_params);
356 
357  ImportStatus import_status;
358  {
359  auto& current_user = session_info->get_currentUser();
360  auto [server, user_mapping, foreign_table] =
362  copy_params,
363  catalog.getDatabaseId(),
364  table_,
365  current_user.userId);
366 
367  // set fragment size for proxy foreign table during import
368  foreign_table->maxFragRows = proxy_foreign_table_fragment_size_;
369 
370  auto data_wrapper =
372  get_data_wrapper_type(copy_params),
373  catalog.getDatabaseId(),
374  foreign_table.get(),
375  user_mapping.get());
376 
377  ChunkMetadataVector metadata_vector =
378  metadata_scan(data_wrapper.get(), foreign_table.get());
379  if (metadata_vector.empty()) { // an empty data source
380  return {};
381  }
382 
383  import_status = import_foreign_data(metadata_vector,
384  connector_.get(),
385  catalog,
386  table_,
387  data_wrapper.get(),
388  session_info,
389  copy_params,
390  copy_from_source);
391 
392  } // this scope ensures that fsi proxy objects are destroyed proir to checkpointing
393 
394  finalize(*session_info, import_status, table_->tableId);
395 
396  return import_status;
397 }
398 
400  const Catalog_Namespace::SessionInfo* session_info) {
401  return importGeneral(session_info, copy_from_source_, copy_params_);
402 }
403 
404 void ForeignDataImporter::setDefaultImportPath(const std::string& base_path) {
405  auto data_dir_path = boost::filesystem::canonical(base_path);
406  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
407 }
408 
410  const Catalog_Namespace::SessionInfo* session_info) {
412 
414 #if ENABLE_IMPORT_PARQUET
416 #endif
418  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
419  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
420 #if ENABLE_IMPORT_PARQUET
421  ", 'PARQUET_FILE'"
422 #endif
423  " or 'REGEX_PARSED_FILE'");
424  }
425 
428 
429 #ifdef HAVE_AWS_S3
430 
431  auto uuid = boost::uuids::random_generator()();
432  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
433  auto import_path = std::filesystem::path(default_import_path_) / base_path;
434 
435  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
445  import_path);
446  s3_archive->init_for_read();
447 
448  const auto bucket_name = s3_archive->url_part(4);
449 
450  auto object_keys = s3_archive->get_objkeys();
451  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
452  size_t object_count = 0;
453  for (const auto& objkey : object_keys) {
454  auto& object = objects_to_process[object_count++];
455  object.object_key = objkey;
456  object.is_downloaded = false;
457  }
458 
459  // Ensure files & dirs are cleaned up, regardless of outcome
460  ScopeGuard cleanup_guard = [&] {
461  if (std::filesystem::exists(import_path)) {
462  std::filesystem::remove_all(import_path);
463  }
464  };
465 
466  ImportStatus aggregate_import_status;
467  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
468 
469  std::mutex communication_mutex;
470  bool continue_downloading = true;
471  bool download_exception_occured = false;
472 
473  std::condition_variable files_download_condition;
474 
475  auto is_downloading_finished = [&] {
476  std::unique_lock communication_lock(communication_mutex);
477  return !continue_downloading || download_exception_occured;
478  };
479 
480  std::function<void(const std::vector<size_t>&)> download_objects =
481  [&](const std::vector<size_t>& partition) {
482  for (const auto& index : partition) {
483  DownloadedObjectToProcess& object = objects_to_process[index];
484  const std::string& obj_key = object.object_key;
485  if (is_downloading_finished()) {
486  return;
487  }
488  std::exception_ptr eptr; // unused
489  std::string local_file_path;
490  std::string exception_what;
491  bool exception_occured = false;
492 
493  try {
494  local_file_path = s3_archive->land(obj_key,
495  eptr,
496  false,
497  /*allow_named_pipe_use=*/false,
498  /*track_file_path=*/false);
499  } catch (const std::exception& e) {
500  exception_what = e.what();
501  exception_occured = true;
502  }
503 
504  if (is_downloading_finished()) {
505  return;
506  }
507  if (exception_occured) {
508  {
509  std::unique_lock communication_lock(communication_mutex);
510  download_exception_occured = true;
511  }
512  files_download_condition.notify_all();
513  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
514  "': " + exception_what);
515  }
516 
517  object.download_file_path = local_file_path;
518  object.is_downloaded =
519  true; // this variable is atomic and therefore acts as a lock, it must be
520  // set last to ensure no data race
521 
522  files_download_condition.notify_all();
523  }
524  };
525 
526  std::function<void()> import_local_files = [&]() {
527  for (size_t object_index = 0; object_index < object_count;) {
528  {
529  std::unique_lock communication_lock(communication_mutex);
530  files_download_condition.wait(
531  communication_lock,
532  [&download_exception_occured, object_index, &objects_to_process]() {
533  return objects_to_process[object_index].is_downloaded ||
534  download_exception_occured;
535  });
536  if (download_exception_occured) { // do not wait for object index if a download
537  // error has occured
538  return;
539  }
540  }
541 
542  // find largest range of files to import
543  size_t end_object_index = object_count;
544  for (size_t i = object_index + 1; i < object_count; ++i) {
545  if (!objects_to_process[i].is_downloaded) {
546  end_object_index = i;
547  break;
548  }
549  }
550 
551  ImportStatus local_import_status;
552  std::string local_import_dir;
553  try {
554  CopyParams local_copy_params;
555  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
556  copy_params_, objects_to_process, object_index, end_object_index);
557  local_import_status =
558  importGeneral(session_info, local_import_dir, local_copy_params);
559  // clean up temporary files
560  std::filesystem::remove_all(local_import_dir);
561  } catch (const std::exception& except) {
562  // replace all occurences of file names with the object keys for
563  // users
564  std::string what = except.what();
565 
566  for (size_t i = object_index; i < end_object_index; ++i) {
567  auto& object = objects_to_process[i];
568  what = boost::regex_replace(what,
569  boost::regex{object.import_file_path},
570  bucket_name + "/" + object.object_key);
571  }
572  {
573  std::unique_lock communication_lock(communication_mutex);
574  continue_downloading = false;
575  }
576  // clean up temporary files
577  std::filesystem::remove_all(local_import_dir);
578  throw std::runtime_error(what);
579  }
580  aggregate_import_status += local_import_status;
582  aggregate_import_status);
583  if (aggregate_import_status.load_failed) {
584  {
585  std::unique_lock communication_lock(communication_mutex);
586  continue_downloading = false;
587  }
588  return;
589  }
590 
591  object_index =
592  end_object_index; // all objects in range [object_index,end_object_index)
593  // correctly imported at this point in excecution, move onto
594  // next range
595  }
596  };
597 
598  std::vector<size_t> partition_range(object_count);
599  std::iota(partition_range.begin(), partition_range.end(), 0);
600  auto download_futures = foreign_storage::create_futures_for_workers(
601  partition_range, num_download_threads, download_objects);
602 
603  auto import_future = std::async(std::launch::async, import_local_files);
604 
605  for (auto& future : download_futures) {
606  future.wait();
607  }
608  import_future.get(); // may throw an exception
609 
610  // get any remaining exceptions
611  for (auto& future : download_futures) {
612  future.get();
613  }
614  return aggregate_import_status;
615 
616 #else
617  throw std::runtime_error("AWS S3 support not available");
618 
619  return {};
620 #endif
621 }
622 
623 #ifdef ENABLE_IMPORT_PARQUET
624 ImportStatus ForeignDataImporter::importParquet(
625  const Catalog_Namespace::SessionInfo* session_info) {
626  auto& catalog = session_info->getCatalog();
627 
629 
630  auto& current_user = session_info->get_currentUser();
632  catalog.getDatabaseId(), current_user.userId, copy_from_source_, copy_params_);
633 
634  auto user_mapping =
636  catalog.getDatabaseId(),
637  current_user.userId,
639  copy_params_,
640  server.get());
641 
642  auto foreign_table =
644  catalog.getDatabaseId(), table_, copy_from_source_, copy_params_, server.get());
645 
646  foreign_table->validateOptionValues();
647 
650  catalog.getDatabaseId(),
651  foreign_table.get(),
652  user_mapping.get());
653 
654  if (auto parquet_import =
655  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
657 
658  // determine the number of threads to use at each level
659 
660  int max_threads = 0;
661  if (copy_params_.threads == 0) {
662  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
664  } else {
665  max_threads = static_cast<size_t>(copy_params_.threads);
666  }
667  CHECK_GT(max_threads, 0);
668 
669  int num_importer_threads =
670  std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
671  parquet_import->setNumThreads(num_importer_threads);
672  int num_outer_thread = 1;
673  for (int thread_count = 1; thread_count <= max_threads; ++thread_count) {
674  if (thread_count * num_importer_threads <= max_threads) {
675  num_outer_thread = thread_count;
676  }
677  }
678 
679  std::shared_mutex import_status_mutex;
680  ImportStatus import_status; // manually update
681 
682  auto import_failed = [&import_status_mutex, &import_status] {
683  std::shared_lock import_status_lock(import_status_mutex);
684  return import_status.load_failed;
685  };
686 
687  std::vector<std::future<void>> futures;
688 
689  for (int ithread = 0; ithread < num_outer_thread; ++ithread) {
690  futures.emplace_back(std::async(std::launch::async, [&] {
691  while (true) {
692  auto batch_result = parquet_import->getNextImportBatch();
693  if (import_failed()) {
694  break;
695  }
696  auto batch = batch_result->getInsertData();
697  if (!batch || import_failed()) {
698  break;
699  }
700  insert_data_loader.insertData(*session_info, *batch);
701 
702  auto batch_import_status = batch_result->getImportStatus();
703  {
704  std::unique_lock import_status_lock(import_status_mutex);
705  import_status.rows_completed += batch_import_status.rows_completed;
706  import_status.rows_rejected += batch_import_status.rows_rejected;
707  if (import_status.rows_rejected > copy_params_.max_reject) {
708  import_status.load_failed = true;
709  import_status.load_msg =
710  "Load was cancelled due to max reject rows being reached";
711  break;
712  }
713  }
714  }
715  }));
716  }
717 
718  for (auto& future : futures) {
719  future.wait();
720  }
721 
722  for (auto& future : futures) {
723  future.get();
724  }
725 
726  if (import_status.load_failed) {
727  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
728  // the rollback in the checkpoint below
729  }
730 
731  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
732 
733  return import_status;
734  }
735 
736  UNREACHABLE();
737  return {};
738 }
739 #endif
740 
742  const Catalog_Namespace::SessionInfo* session_info) {
743  if (foreign_storage::is_s3_uri(copy_from_source_)) {
744  return importGeneralS3(session_info);
745  }
746  return importGeneral(session_info);
747 }
748 
749 } // namespace import_export
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
Definition: CopyParams.h:61
static constexpr char const * REGEX_PARSER
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
void validate_sort_options(const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
bool g_enable_legacy_delimited_import
Definition: ParserNode.cpp:83
#define LOG(tag)
Definition: Logger.h:217
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
import_export::ImportStatus import_foreign_data(const ChunkMetadataVector &metadata_vector, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source)
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:236
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string to_string(char const *&&v)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:83
const std::string kDefaultImportDirName
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:75
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:65
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:284
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
std::string get_data_wrapper_type(const import_export::CopyParams &copy_params)
bool is_s3_uri(const std::string &file_path)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
#define CHECK_LT(x, y)
Definition: Logger.h:233
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK_LE(x, y)
Definition: Logger.h:234
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
bool g_enable_fsi_regex_import
Definition: ParserNode.cpp:87
Catalog & getCatalog() const
Definition: SessionInfo.h:65
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1941
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: CopyParams.h:62
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:43
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
static constexpr char const * CSV
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
void validate_copy_params(const import_export::CopyParams &copy_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:30
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:60
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:71
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:84
static constexpr char const * PARQUET
size_t g_max_import_threads
Definition: Importer.cpp:106
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:85
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)