OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
18 
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
21 #include <filesystem>
22 
23 #include "Archive/S3Archive.h"
28 #include "Importer.h"
29 #include "Parser/ParserNode.h"
31 #include "Shared/file_path_util.h"
32 #include "Shared/measure.h"
33 #include "Shared/misc.h"
34 #include "Shared/scope.h"
35 #include "UserMapping.h"
36 
38 #ifdef ENABLE_IMPORT_PARQUET
39 extern bool g_enable_legacy_parquet_import;
40 #endif
41 extern bool g_enable_fsi_regex_import;
42 
43 namespace {
45  foreign_storage::ForeignTable* foreign_table) {
46  ChunkMetadataVector metadata_vector;
47  try {
48  data_wrapper->populateChunkMetadata(
49  metadata_vector); // explicitly invoke a metadata scan on data wrapper
51  metadata_scan_exception) {
52  // if a metadata scan exception is thrown, check to see if we can adjust
53  // the fragment size and retry
54 
55  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
56  if (min_feasible_fragment_size < 0) {
57  throw; // no valid fragment size returned by exception
58  }
59  foreign_table->maxFragRows = min_feasible_fragment_size;
60  data_wrapper->populateChunkMetadata(
61  metadata_vector); // attempt another metadata scan, note, we assume that the
62  // metadata scan can be reentered safely after throwing the
63  // exception
64  }
65  return metadata_vector;
66 }
67 
68 std::string get_import_id(const import_export::CopyParams& copy_params,
69  const std::string& copy_from_source) {
72 #ifdef ENABLE_IMPORT_PARQUET
74 #endif
75  ) {
76  return boost::filesystem::path(copy_from_source).filename().string();
77  }
78 
79  return copy_from_source;
80 }
81 
85  }
86 }
87 
89  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
92  std::unique_ptr<foreign_storage::ForeignStorageBuffer> delete_buffer;
93 };
94 
95 std::unique_ptr<FragmentBuffers> create_fragment_buffers(
96  const int32_t fragment_id,
98  const TableDescriptor* table) {
99  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
100 
101  std::set<ChunkKey> fragment_keys;
102  for (const auto col_desc : columns) {
103  ChunkKey key{
104  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
105  if (col_desc->columnType.is_varlen_indeed()) {
106  auto data_key = key;
107  data_key.push_back(1);
108  fragment_keys.insert(data_key);
109  auto index_key = key;
110  index_key.push_back(2);
111  fragment_keys.insert(index_key);
112  } else {
113  fragment_keys.insert(key);
114  }
115  }
116 
117  // create buffers
118  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
119  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
120  for (const auto& key : fragment_keys) {
121  frag_buffers->fragment_buffers_owner[key] =
122  std::make_unique<foreign_storage::ForeignStorageBuffer>();
123  frag_buffers->fragment_buffers[key] =
124  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
125  }
126 
127  return frag_buffers;
128 }
129 
133  const TableDescriptor* table,
134  const Catalog_Namespace::SessionInfo* session_info,
135  const import_export::CopyParams& copy_params,
136  const std::string& copy_from_source,
137  import_export::ImportStatus& import_status,
138  std::mutex& communication_mutex,
139  bool& continue_loading,
140  bool& load_failed,
141  bool& data_wrapper_error_occured,
142  std::condition_variable& buffers_to_load_condition,
143  std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
144  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
145  while (true) {
146  {
147  std::unique_lock communication_lock(communication_mutex);
148  buffers_to_load_condition.wait(communication_lock, [&]() {
149  return !buffers_to_load.empty() || !continue_loading ||
150  data_wrapper_error_occured;
151  });
152  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
153  return;
154  }
155  }
156 
157  CHECK(!buffers_to_load.empty());
158 
159  try {
160  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
161  {
162  std::unique_lock communication_lock(communication_mutex);
163  grouped_fragment_buffers.reset(buffers_to_load.front().release());
164  buffers_to_load.pop_front();
165  buffers_to_load_condition.notify_all();
166  }
167  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
168  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
169 
170  // get chunks for import
172  table->tableId, catalog.getDatabaseId(), {}, {}};
173 
174  // create chunks from buffers
175  for (const auto& [key, buffer] : fragment_buffers) {
176  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
177  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
178  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
179 
180  if (col_desc->columnType.is_varlen_indeed()) {
181  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
182  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
183  auto index_key = key;
184  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
185  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
186  col_desc,
187  buffer,
188  shared::get_from_map(fragment_buffers, index_key),
189  false);
190  }
191  } else { // regular non-varlen case with no index buffer
192  insert_chunks.chunks[col_id] =
193  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
194  }
195  }
196 
197  // mark which row indices are valid for import
198  auto row_count = fragment_buffers.begin()
199  ->second->getEncoder()
200  ->getNumElems(); // assume all chunks have same number of
201  // rows, this is validated at a lower level
202  insert_chunks.valid_row_indices.reserve(row_count);
203  for (size_t irow = 0; irow < row_count; ++irow) {
204  if (delete_buffer->size() > 0) {
205  CHECK_LE(irow, delete_buffer->size());
206  if (delete_buffer->getMemoryPtr()[irow]) {
207  continue;
208  }
209  }
210  insert_chunks.valid_row_indices.emplace_back(irow);
211  }
212 
213  // import chunks
214  insert_data_loader.insertChunks(*session_info, insert_chunks);
215 
216  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
217  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
218  import_status.rows_completed += insert_chunks.valid_row_indices.size();
219  if (import_status.rows_rejected > copy_params.max_reject) {
220  import_status.load_failed = true;
221  import_status.load_msg =
222  "Load was cancelled due to max reject rows being reached";
224  get_import_id(copy_params, copy_from_source), import_status);
225  std::unique_lock communication_lock(communication_mutex);
226  load_failed = true;
227  buffers_to_load_condition.notify_all();
228  return;
229  }
231  get_import_id(copy_params, copy_from_source), import_status);
232  } catch (...) {
233  {
234  std::unique_lock communication_lock(communication_mutex);
235  load_failed = true;
236  buffers_to_load_condition.notify_all();
237  }
238  throw;
239  }
240  }
241 }
242 
244  const int32_t max_fragment_id,
247  const TableDescriptor* table,
249  const Catalog_Namespace::SessionInfo* session_info,
250  const import_export::CopyParams& copy_params,
251  const std::string& copy_from_source,
252  const size_t maximum_num_fragments_buffered) {
254  // if render group assignment is enabled, tell the wrapper to create any
255  // RenderGroupAnalyzers it may need for any poly columns in the table, if
256  // that wrapper type supports it
257  data_wrapper->createRenderGroupAnalyzers();
258  }
259 
260  import_export::ImportStatus import_status;
261 
262  std::mutex communication_mutex;
263  bool continue_loading =
264  true; // when false, signals that the last buffer to load has been added, and
265  // loading should cease after loading remaining buffers
266  bool load_failed =
267  false; // signals loading has failed and buffer population should cease
268  bool data_wrapper_error_occured = false; // signals an error occured during buffer
269  // population and loading should cease
270  std::condition_variable buffers_to_load_condition;
271  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
272 
273  // launch separate thread to load processed fragment buffers
274  auto load_future = std::async(std::launch::async,
276  connector,
277  std::ref(catalog),
278  table,
279  session_info,
280  std::cref(copy_params),
281  std::cref(copy_from_source),
282  std::ref(import_status),
283  std::ref(communication_mutex),
284  std::ref(continue_loading),
285  std::ref(load_failed),
286  std::ref(data_wrapper_error_occured),
287  std::ref(buffers_to_load_condition),
288  std::ref(buffers_to_load));
289 
290  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
291  {
292  std::unique_lock communication_lock(communication_mutex);
293  buffers_to_load_condition.wait(communication_lock, [&]() {
294  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
295  });
296  if (load_failed) {
297  break;
298  }
299  }
300  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
301  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
302  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
303 
304  // get the buffers, accounting for the possibility of the requested fragment id being
305  // out of bounds
306  try {
307  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
309  break;
310  } catch (...) {
311  std::unique_lock communication_lock(communication_mutex);
312  data_wrapper_error_occured = true;
313  buffers_to_load_condition.notify_all();
314  throw;
315  }
316 
317  std::unique_lock communication_lock(communication_mutex);
318  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
319  buffers_to_load_condition.notify_all();
320  }
321 
322  { // data wrapper processing has finished, notify loading thread
323  std::unique_lock communication_lock(communication_mutex);
324  continue_loading = false;
325  buffers_to_load_condition.notify_all();
326  }
327 
328  // any exceptions in separate loading thread will occur here
329  load_future.get();
330 
331  return import_status;
332 }
333 
334 #ifdef HAVE_AWS_S3
335 struct DownloadedObjectToProcess {
336  std::string object_key;
337  std::atomic<bool> is_downloaded;
338  std::string download_file_path;
339  std::string import_file_path;
340 };
341 
342 size_t get_number_of_digits(const size_t number) {
343  return std::to_string(number).length();
344 }
345 
346 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
347  const import_export::CopyParams& s3_copy_params,
348  std::vector<DownloadedObjectToProcess>& objects_to_process,
349  const size_t begin_object_index,
350  const size_t end_object_index) {
351  import_export::CopyParams local_copy_params = s3_copy_params;
352  // remove any members from `local_copy_params` that are only intended to be used at a
353  // higher level
354  local_copy_params.s3_access_key.clear();
355  local_copy_params.s3_secret_key.clear();
356  local_copy_params.s3_session_token.clear();
357  local_copy_params.s3_region.clear();
358  local_copy_params.s3_endpoint.clear();
359 
360  local_copy_params.regex_path_filter = std::nullopt;
361  local_copy_params.file_sort_order_by = "PATHNAME"; // see comment below
362  local_copy_params.file_sort_regex = std::nullopt;
363 
364  CHECK_GT(end_object_index, begin_object_index);
365  CHECK_LT(begin_object_index, objects_to_process.size());
366 
367  size_t num_objects = end_object_index - begin_object_index;
368  auto& first_object = objects_to_process[begin_object_index];
369  std::string first_path = first_object.download_file_path;
370  std::string temp_dir = first_path + "_import";
371 
372  if (!std::filesystem::create_directory(temp_dir)) {
373  throw std::runtime_error("failed to create temporary directory for import: " +
374  temp_dir);
375  }
376 
377  // construct a directory with files to import
378  //
379  // NOTE:
380  // * files are moved into `temp_dir` in the exact order that they appear in
381  // `objects_to_process`
382  //
383  // * the `PATHNAME` option is set for `file_sort_order_by` in order to
384  // guarantee that import occurs in the order specified by user, provided the
385  // data wrapper correctly supports the `PATHNAME` option
386  //
387  // * filenames are chosen such that they appear in lexicographical order by
388  // pathname, thus require padding by appropriate number of zeros
389  //
390  // * filenames must maintain their suffixes for some features of data
391  // wrappers to work correctly, especially in the case of archived data (such
392  // as `.zip` or `.gz` or any variant.)
393  std::filesystem::path temp_dir_path{temp_dir};
394  size_t counter = 0;
395  size_t num_zero = get_number_of_digits(num_objects);
396  for (size_t i = begin_object_index; i < end_object_index; ++i) {
397  auto& object = objects_to_process[i];
398  std::filesystem::path old_path = object.download_file_path;
399  auto counter_str = std::to_string(counter++);
400  auto zero_padded_counter_str =
401  std::string(num_zero - counter_str.length(), '0') + counter_str;
402  auto new_path = (temp_dir_path / zero_padded_counter_str).string() +
403  std::filesystem::path{object.object_key}.extension().string();
404  std::filesystem::rename(old_path, new_path);
405  object.import_file_path = new_path;
406  }
407  return {temp_dir, local_copy_params};
408 }
409 #endif
410 
411 } // namespace
412 
413 namespace import_export {
414 
415 ForeignDataImporter::ForeignDataImporter(const std::string& copy_from_source,
416  const CopyParams& copy_params,
417  const TableDescriptor* table)
418  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
419  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
420 }
421 
423  const Catalog_Namespace::SessionInfo& parent_session_info,
424  ImportStatus& import_status,
425  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
426  string_dictionaries) {
427  if (table_->persistenceLevel ==
428  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
429  // tables
430  if (!import_status.load_failed) {
431  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
432  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
433  if (!string_dictionary->checkpoint()) {
434  LOG(ERROR) << "Checkpointing Dictionary for Column "
435  << column_desciptor->columnName << " failed.";
436  import_status.load_failed = true;
437  import_status.load_msg = "Dictionary checkpoint failed";
438  break;
439  }
440  }
441  }
442  }
443  if (import_status.load_failed) {
444  connector_->rollback(parent_session_info, table_->tableId);
445  } else {
446  connector_->checkpoint(parent_session_info, table_->tableId);
447  }
448 }
449 
451  const Catalog_Namespace::SessionInfo& parent_session_info,
452  ImportStatus& import_status,
453  const int32_t table_id) {
454  auto& catalog = parent_session_info.getCatalog();
455 
456  auto logical_columns =
457  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
458 
459  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
460  for (const auto& column_descriptor : logical_columns) {
461  if (!column_descriptor->columnType.is_dict_encoded_string()) {
462  continue;
463  }
464  auto dict_descriptor =
465  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
466  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
467  }
468 
469  finalize(parent_session_info, import_status, string_dictionaries);
470 }
471 
472 // This value is used only if it is non-zero; it is for testing purposes only
474 
475 namespace {
477  const size_t maximum_num_fragments_buffered,
478  const size_t max_import_batch_row_count,
479  const Catalog_Namespace::SessionInfo& parent_session_info,
480  const int32_t table_id) {
483  }
484 
485  if (max_import_batch_row_count != 0) {
486  return max_import_batch_row_count;
487  }
488 
489  // This number is chosen as a reasonable default value to reserve for
490  // intermediate buffering during import, it is about 2GB of memory. Note,
491  // depending on the acutal size of var len values, this heuristic target may
492  // be off. NOTE: `maximum_num_fragments_buffered` scales the allowed buffer
493  // size with the assumption that in the worst case all buffers may be
494  // buffered at once.
495  const size_t max_buffer_byte_size =
496  2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
497 
498  auto& catalog = parent_session_info.getCatalog();
499 
500  auto logical_columns =
501  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
502 
503  size_t row_byte_size = 0;
504  for (const auto& column_descriptor : logical_columns) {
505  auto type = column_descriptor->columnType;
506  size_t field_byte_length = 0;
507  if (type.is_varlen_indeed()) {
508  // use a heuristic where varlen types are assumed to be 256 bytes in length
509  field_byte_length = 256;
510  } else {
511  field_byte_length = type.get_size();
512  }
513  row_byte_size += field_byte_length;
514  }
515 
516  return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
518 }
519 } // namespace
520 
522  const Catalog_Namespace::SessionInfo* session_info,
523  const std::string& copy_from_source,
524  const CopyParams& copy_params) {
525  auto& catalog = session_info->getCatalog();
526 
528 
529  // validate copy params before import in order to print user friendly messages
530  validate_copy_params(copy_params);
531 
532  ImportStatus import_status;
533  {
534  auto& current_user = session_info->get_currentUser();
535  auto [server, user_mapping, foreign_table] =
537  copy_params,
538  catalog.getDatabaseId(),
539  table_,
540  current_user.userId);
541 
542  // maximum number of fragments buffered in memory at any one time, affects
543  // `maxFragRows` heuristic below
544  const size_t maximum_num_fragments_buffered = 3;
545  // set fragment size for proxy foreign table during import
546  foreign_table->maxFragRows =
547  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
548  copy_params.max_import_batch_row_count,
549  *session_info,
550  table_->tableId);
551 
552  // log for debugging purposes
553  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
554  << " for table " << table_->tableName;
555 
556  auto data_wrapper =
558  copy_params,
559  catalog.getDatabaseId(),
560  foreign_table.get(),
561  user_mapping.get());
562 
563  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
564  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
565  ChunkMetadataVector metadata_vector =
566  metadata_scan(data_wrapper.get(), foreign_table.get());
567  if (metadata_vector.empty()) { // an empty data source
568  return {};
569  }
570  max_fragment_id = 0;
571  for (const auto& [key, _] : metadata_vector) {
572  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
573  }
574  CHECK_GE(max_fragment_id, 0);
575  }
576 
577  import_status = import_foreign_data(max_fragment_id,
578  connector_.get(),
579  catalog,
580  table_,
581  data_wrapper.get(),
582  session_info,
583  copy_params,
584  copy_from_source,
585  maximum_num_fragments_buffered);
586 
587  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
588 
589  finalize(*session_info, import_status, table_->tableId);
590 
591  return import_status;
592 }
593 
595  const Catalog_Namespace::SessionInfo* session_info) {
596  return importGeneral(session_info, copy_from_source_, copy_params_);
597 }
598 
599 void ForeignDataImporter::setDefaultImportPath(const std::string& base_path) {
600  auto data_dir_path = boost::filesystem::canonical(base_path);
601  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
602 }
603 
605  const Catalog_Namespace::SessionInfo* session_info) {
607 
609 #if ENABLE_IMPORT_PARQUET
611 #endif
613  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
614  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
615 #if ENABLE_IMPORT_PARQUET
616  ", 'PARQUET_FILE'"
617 #endif
618  " or 'REGEX_PARSED_FILE'");
619  }
620 
625 
626 #ifdef HAVE_AWS_S3
627 
628  auto uuid = boost::uuids::random_generator()();
629  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
630  auto import_path = std::filesystem::path(default_import_path_) / base_path;
631 
632  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
642  import_path);
643  s3_archive->init_for_read();
644 
645  const auto bucket_name = s3_archive->url_part(4);
646 
647  auto object_keys = s3_archive->get_objkeys();
648  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649  size_t object_count = 0;
650  for (const auto& objkey : object_keys) {
651  auto& object = objects_to_process[object_count++];
652  object.object_key = objkey;
653  object.is_downloaded = false;
654  }
655 
656  // Ensure files & dirs are cleaned up, regardless of outcome
657  ScopeGuard cleanup_guard = [&] {
658  if (std::filesystem::exists(import_path)) {
659  std::filesystem::remove_all(import_path);
660  }
661  };
662 
663  ImportStatus aggregate_import_status;
664  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
665 
666  std::mutex communication_mutex;
667  bool continue_downloading = true;
668  bool download_exception_occured = false;
669 
670  std::condition_variable files_download_condition;
671 
672  auto is_downloading_finished = [&] {
673  std::unique_lock communication_lock(communication_mutex);
674  return !continue_downloading || download_exception_occured;
675  };
676 
677  std::function<void(const std::vector<size_t>&)> download_objects =
678  [&](const std::vector<size_t>& partition) {
679  for (const auto& index : partition) {
680  DownloadedObjectToProcess& object = objects_to_process[index];
681  const std::string& obj_key = object.object_key;
682  if (is_downloading_finished()) {
683  return;
684  }
685  std::exception_ptr eptr; // unused
686  std::string local_file_path;
687  std::string exception_what;
688  bool exception_occured = false;
689 
690  try {
691  local_file_path = s3_archive->land(obj_key,
692  eptr,
693  false,
694  /*allow_named_pipe_use=*/false,
695  /*track_file_path=*/false);
696  } catch (const std::exception& e) {
697  exception_what = e.what();
698  exception_occured = true;
699  }
700 
701  if (is_downloading_finished()) {
702  return;
703  }
704  if (exception_occured) {
705  {
706  std::unique_lock communication_lock(communication_mutex);
707  download_exception_occured = true;
708  }
709  files_download_condition.notify_all();
710  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
711  "': " + exception_what);
712  }
713 
714  object.download_file_path = local_file_path;
715  object.is_downloaded =
716  true; // this variable is atomic and therefore acts as a lock, it must be
717  // set last to ensure no data race
718 
719  files_download_condition.notify_all();
720  }
721  };
722 
723  std::function<void()> import_local_files = [&]() {
724  for (size_t object_index = 0; object_index < object_count;) {
725  {
726  std::unique_lock communication_lock(communication_mutex);
727  files_download_condition.wait(
728  communication_lock,
729  [&download_exception_occured, object_index, &objects_to_process]() {
730  return objects_to_process[object_index].is_downloaded ||
731  download_exception_occured;
732  });
733  if (download_exception_occured) { // do not wait for object index if a download
734  // error has occured
735  return;
736  }
737  }
738 
739  // find largest range of files to import
740  size_t end_object_index = object_count;
741  for (size_t i = object_index + 1; i < object_count; ++i) {
742  if (!objects_to_process[i].is_downloaded) {
743  end_object_index = i;
744  break;
745  }
746  }
747 
748  ImportStatus local_import_status;
749  std::string local_import_dir;
750  try {
751  CopyParams local_copy_params;
752  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
753  copy_params_, objects_to_process, object_index, end_object_index);
754  local_import_status =
755  importGeneral(session_info, local_import_dir, local_copy_params);
756  // clean up temporary files
757  std::filesystem::remove_all(local_import_dir);
758  } catch (const std::exception& except) {
759  // replace all occurences of file names with the object keys for
760  // users
761  std::string what = except.what();
762 
763  for (size_t i = object_index; i < end_object_index; ++i) {
764  auto& object = objects_to_process[i];
765  what = boost::regex_replace(what,
766  boost::regex{object.import_file_path},
767  bucket_name + "/" + object.object_key);
768  }
769  {
770  std::unique_lock communication_lock(communication_mutex);
771  continue_downloading = false;
772  }
773  // clean up temporary files
774  std::filesystem::remove_all(local_import_dir);
775  throw std::runtime_error(what);
776  }
777  aggregate_import_status += local_import_status;
779  aggregate_import_status);
780  if (aggregate_import_status.load_failed) {
781  {
782  std::unique_lock communication_lock(communication_mutex);
783  continue_downloading = false;
784  }
785  return;
786  }
787 
788  object_index =
789  end_object_index; // all objects in range [object_index,end_object_index)
790  // correctly imported at this point in excecution, move onto
791  // next range
792  }
793  };
794 
795  std::vector<size_t> partition_range(object_count);
796  std::iota(partition_range.begin(), partition_range.end(), 0);
797  auto download_futures = foreign_storage::create_futures_for_workers(
798  partition_range, num_download_threads, download_objects);
799 
800  auto import_future = std::async(std::launch::async, import_local_files);
801 
802  for (auto& future : download_futures) {
803  future.wait();
804  }
805  import_future.get(); // may throw an exception
806 
807  // get any remaining exceptions
808  for (auto& future : download_futures) {
809  future.get();
810  }
811  return aggregate_import_status;
812 
813 #else
814  throw std::runtime_error("AWS S3 support not available");
815 
816  return {};
817 #endif
818 }
819 
820 #ifdef ENABLE_IMPORT_PARQUET
821 ImportStatus ForeignDataImporter::importParquet(
822  const Catalog_Namespace::SessionInfo* session_info) {
823  auto& catalog = session_info->getCatalog();
824 
826 
827  auto& current_user = session_info->get_currentUser();
829  catalog.getDatabaseId(), current_user.userId, copy_from_source_, copy_params_);
830 
831  auto user_mapping =
833  catalog.getDatabaseId(),
834  current_user.userId,
836  copy_params_,
837  server.get());
838 
839  auto foreign_table =
841  catalog.getDatabaseId(), table_, copy_from_source_, copy_params_, server.get());
842 
843  foreign_table->validateOptionValues();
844 
847  catalog.getDatabaseId(),
848  foreign_table.get(),
849  user_mapping.get());
850 
851  if (auto parquet_import =
852  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
854 
855  // determine the number of threads to use at each level
856 
857  int max_threads = 0;
858  if (copy_params_.threads == 0) {
859  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
861  } else {
862  max_threads = static_cast<size_t>(copy_params_.threads);
863  }
864  CHECK_GT(max_threads, 0);
865 
866  int num_importer_threads =
867  std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
868  parquet_import->setNumThreads(num_importer_threads);
869  int num_outer_thread = 1;
870  for (int thread_count = 1; thread_count <= max_threads; ++thread_count) {
871  if (thread_count * num_importer_threads <= max_threads) {
872  num_outer_thread = thread_count;
873  }
874  }
875 
876  std::shared_mutex import_status_mutex;
877  ImportStatus import_status; // manually update
878 
879  auto import_failed = [&import_status_mutex, &import_status] {
880  std::shared_lock import_status_lock(import_status_mutex);
881  return import_status.load_failed;
882  };
883 
884  std::vector<std::future<void>> futures;
885 
886  for (int ithread = 0; ithread < num_outer_thread; ++ithread) {
887  futures.emplace_back(std::async(std::launch::async, [&] {
888  while (true) {
889  auto batch_result = parquet_import->getNextImportBatch();
890  if (import_failed()) {
891  break;
892  }
893  auto batch = batch_result->getInsertData();
894  if (!batch || import_failed()) {
895  break;
896  }
897  insert_data_loader.insertData(*session_info, *batch);
898 
899  auto batch_import_status = batch_result->getImportStatus();
900  {
901  std::unique_lock import_status_lock(import_status_mutex);
902  import_status.rows_completed += batch_import_status.rows_completed;
903  import_status.rows_rejected += batch_import_status.rows_rejected;
904  if (import_status.rows_rejected > copy_params_.max_reject) {
905  import_status.load_failed = true;
906  import_status.load_msg =
907  "Load was cancelled due to max reject rows being reached";
908  break;
909  }
910  }
911  }
912  }));
913  }
914 
915  for (auto& future : futures) {
916  future.wait();
917  }
918 
919  for (auto& future : futures) {
920  future.get();
921  }
922 
923  if (import_status.load_failed) {
924  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
925  // the rollback in the checkpoint below
926  }
927 
928  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
929 
930  return import_status;
931  }
932 
933  UNREACHABLE();
934  return {};
935 }
936 #endif
937 
939  const Catalog_Namespace::SessionInfo* session_info) {
940  if (shared::is_s3_uri(copy_from_source_)) {
941  return importGeneralS3(session_info);
942  }
943  return importGeneral(session_info);
944 }
945 
946 } // namespace import_export
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
Definition: CopyParams.h:62
std::vector< int > ChunkKey
Definition: types.h:36
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::string tableName
std::string get_import_id(const import_export::CopyParams &copy_params, const std::string &copy_from_source)
bool is_s3_uri(const std::string &file_path)
bool g_enable_legacy_delimited_import
Definition: ParserNode.cpp:82
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void validate_sort_options(const FilePathOptions &options)
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
#define CHECK_GE(x, y)
Definition: Logger.h:306
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:304
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
#define CHECK_LT(x, y)
Definition: Logger.h:303
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK_LE(x, y)
Definition: Logger.h:304
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
bool g_enable_fsi_regex_import
Definition: ParserNode.cpp:86
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2267
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: CopyParams.h:63
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
size_t g_max_import_threads
Definition: Importer.cpp:106
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)