OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
18 
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
21 #include <filesystem>
22 
23 #include "Archive/S3Archive.h"
28 #include "Importer.h"
29 #include "Parser/ParserNode.h"
31 #include "Shared/measure.h"
32 #include "Shared/misc.h"
33 #include "Shared/scope.h"
34 #include "UserMapping.h"
35 
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
39 #endif
40 extern bool g_enable_fsi_regex_import;
41 
42 namespace {
44  foreign_storage::ForeignTable* foreign_table) {
45  ChunkMetadataVector metadata_vector;
46  try {
47  data_wrapper->populateChunkMetadata(
48  metadata_vector); // explicitly invoke a metadata scan on data wrapper
50  metadata_scan_exception) {
51  // if a metadata scan exception is thrown, check to see if we can adjust
52  // the fragment size and retry
53 
54  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
55  if (min_feasible_fragment_size < 0) {
56  throw; // no valid fragment size returned by exception
57  }
58  foreign_table->maxFragRows = min_feasible_fragment_size;
59  data_wrapper->populateChunkMetadata(
60  metadata_vector); // attempt another metadata scan, note, we assume that the
61  // metadata scan can be reentered safely after throwing the
62  // exception
63  }
64  return metadata_vector;
65 }
66 
67 std::string get_import_id(const import_export::CopyParams& copy_params,
68  const std::string& copy_from_source) {
71 #ifdef ENABLE_IMPORT_PARQUET
73 #endif
74  ) {
75  return boost::filesystem::path(copy_from_source).filename().string();
76  }
77 
78  return copy_from_source;
79 }
80 
84  }
85 }
86 
88  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
91  std::unique_ptr<foreign_storage::ForeignStorageBuffer> delete_buffer;
92 };
93 
94 std::unique_ptr<FragmentBuffers> create_fragment_buffers(
95  const int32_t fragment_id,
97  const TableDescriptor* table) {
98  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
99 
100  std::set<ChunkKey> fragment_keys;
101  for (const auto col_desc : columns) {
102  ChunkKey key{
103  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
104  if (col_desc->columnType.is_varlen_indeed()) {
105  auto data_key = key;
106  data_key.push_back(1);
107  fragment_keys.insert(data_key);
108  auto index_key = key;
109  index_key.push_back(2);
110  fragment_keys.insert(index_key);
111  } else {
112  fragment_keys.insert(key);
113  }
114  }
115 
116  // create buffers
117  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
118  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
119  for (const auto& key : fragment_keys) {
120  frag_buffers->fragment_buffers_owner[key] =
121  std::make_unique<foreign_storage::ForeignStorageBuffer>();
122  frag_buffers->fragment_buffers[key] =
123  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
124  }
125 
126  return frag_buffers;
127 }
128 
132  const TableDescriptor* table,
133  const Catalog_Namespace::SessionInfo* session_info,
134  const import_export::CopyParams& copy_params,
135  const std::string& copy_from_source,
136  import_export::ImportStatus& import_status,
137  std::mutex& communication_mutex,
138  bool& continue_loading,
139  bool& load_failed,
140  bool& data_wrapper_error_occured,
141  std::condition_variable& buffers_to_load_condition,
142  std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
143  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
144  while (true) {
145  {
146  std::unique_lock communication_lock(communication_mutex);
147  buffers_to_load_condition.wait(communication_lock, [&]() {
148  return !buffers_to_load.empty() || !continue_loading ||
149  data_wrapper_error_occured;
150  });
151  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
152  return;
153  }
154  }
155 
156  CHECK(!buffers_to_load.empty());
157 
158  try {
159  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
160  {
161  std::unique_lock communication_lock(communication_mutex);
162  grouped_fragment_buffers.reset(buffers_to_load.front().release());
163  buffers_to_load.pop_front();
164  buffers_to_load_condition.notify_all();
165  }
166  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
167  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
168 
169  // get chunks for import
171  table->tableId, catalog.getDatabaseId(), {}, {}};
172 
173  // create chunks from buffers
174  for (const auto& [key, buffer] : fragment_buffers) {
175  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
176  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
177  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
178 
179  if (col_desc->columnType.is_varlen_indeed()) {
180  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
181  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
182  auto index_key = key;
183  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
184  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
185  col_desc,
186  buffer,
187  shared::get_from_map(fragment_buffers, index_key),
188  false);
189  }
190  } else { // regular non-varlen case with no index buffer
191  insert_chunks.chunks[col_id] =
192  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
193  }
194  }
195 
196  // mark which row indices are valid for import
197  auto row_count = fragment_buffers.begin()
198  ->second->getEncoder()
199  ->getNumElems(); // assume all chunks have same number of
200  // rows, this is validated at a lower level
201  insert_chunks.valid_row_indices.reserve(row_count);
202  for (size_t irow = 0; irow < row_count; ++irow) {
203  if (delete_buffer->size() > 0) {
204  CHECK_LE(irow, delete_buffer->size());
205  if (delete_buffer->getMemoryPtr()[irow]) {
206  continue;
207  }
208  }
209  insert_chunks.valid_row_indices.emplace_back(irow);
210  }
211 
212  // import chunks
213  insert_data_loader.insertChunks(*session_info, insert_chunks);
214 
215  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
216  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
217  import_status.rows_completed += insert_chunks.valid_row_indices.size();
218  if (import_status.rows_rejected > copy_params.max_reject) {
219  import_status.load_failed = true;
220  import_status.load_msg =
221  "Load was cancelled due to max reject rows being reached";
223  get_import_id(copy_params, copy_from_source), import_status);
224  std::unique_lock communication_lock(communication_mutex);
225  load_failed = true;
226  buffers_to_load_condition.notify_all();
227  return;
228  }
230  get_import_id(copy_params, copy_from_source), import_status);
231  } catch (...) {
232  {
233  std::unique_lock communication_lock(communication_mutex);
234  load_failed = true;
235  buffers_to_load_condition.notify_all();
236  }
237  throw;
238  }
239  }
240 }
241 
243  const int32_t max_fragment_id,
246  const TableDescriptor* table,
248  const Catalog_Namespace::SessionInfo* session_info,
249  const import_export::CopyParams& copy_params,
250  const std::string& copy_from_source,
251  const size_t maximum_num_fragments_buffered) {
253  // if render group assignment is enabled, tell the wrapper to create any
254  // RenderGroupAnalyzers it may need for any poly columns in the table, if
255  // that wrapper type supports it
256  data_wrapper->createRenderGroupAnalyzers();
257  }
258 
259  import_export::ImportStatus import_status;
260 
261  std::mutex communication_mutex;
262  bool continue_loading =
263  true; // when false, signals that the last buffer to load has been added, and
264  // loading should cease after loading remaining buffers
265  bool load_failed =
266  false; // signals loading has failed and buffer population should cease
267  bool data_wrapper_error_occured = false; // signals an error occured during buffer
268  // population and loading should cease
269  std::condition_variable buffers_to_load_condition;
270  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
271 
272  // launch separate thread to load processed fragment buffers
273  auto load_future = std::async(std::launch::async,
275  connector,
276  std::ref(catalog),
277  table,
278  session_info,
279  std::cref(copy_params),
280  std::cref(copy_from_source),
281  std::ref(import_status),
282  std::ref(communication_mutex),
283  std::ref(continue_loading),
284  std::ref(load_failed),
285  std::ref(data_wrapper_error_occured),
286  std::ref(buffers_to_load_condition),
287  std::ref(buffers_to_load));
288 
289  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
290  {
291  std::unique_lock communication_lock(communication_mutex);
292  buffers_to_load_condition.wait(communication_lock, [&]() {
293  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
294  });
295  if (load_failed) {
296  break;
297  }
298  }
299  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
300  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
301  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
302 
303  // get the buffers, accounting for the possibility of the requested fragment id being
304  // out of bounds
305  try {
306  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
308  break;
309  } catch (...) {
310  std::unique_lock communication_lock(communication_mutex);
311  data_wrapper_error_occured = true;
312  buffers_to_load_condition.notify_all();
313  throw;
314  }
315 
316  std::unique_lock communication_lock(communication_mutex);
317  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
318  buffers_to_load_condition.notify_all();
319  }
320 
321  { // data wrapper processing has finished, notify loading thread
322  std::unique_lock communication_lock(communication_mutex);
323  continue_loading = false;
324  buffers_to_load_condition.notify_all();
325  }
326 
327  // any exceptions in separate loading thread will occur here
328  load_future.get();
329 
330  return import_status;
331 }
332 
333 #ifdef HAVE_AWS_S3
334 struct DownloadedObjectToProcess {
335  std::string object_key;
336  std::atomic<bool> is_downloaded;
337  std::string download_file_path;
338  std::string import_file_path;
339 };
340 
341 size_t get_number_of_digits(const size_t number) {
342  return std::to_string(number).length();
343 }
344 
345 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
346  const import_export::CopyParams& s3_copy_params,
347  std::vector<DownloadedObjectToProcess>& objects_to_process,
348  const size_t begin_object_index,
349  const size_t end_object_index) {
350  import_export::CopyParams local_copy_params = s3_copy_params;
351  // remove any members from `local_copy_params` that are only intended to be used at a
352  // higher level
353  local_copy_params.s3_access_key.clear();
354  local_copy_params.s3_secret_key.clear();
355  local_copy_params.s3_session_token.clear();
356  local_copy_params.s3_region.clear();
357  local_copy_params.s3_endpoint.clear();
358 
359  local_copy_params.regex_path_filter = std::nullopt;
360  local_copy_params.file_sort_order_by = "PATHNAME"; // see comment below
361  local_copy_params.file_sort_regex = std::nullopt;
362 
363  CHECK_GT(end_object_index, begin_object_index);
364  CHECK_LT(begin_object_index, objects_to_process.size());
365 
366  size_t num_objects = end_object_index - begin_object_index;
367  auto& first_object = objects_to_process[begin_object_index];
368  std::string first_path = first_object.download_file_path;
369  std::string temp_dir = first_path + "_import";
370 
371  if (!std::filesystem::create_directory(temp_dir)) {
372  throw std::runtime_error("failed to create temporary directory for import: " +
373  temp_dir);
374  }
375 
376  // construct a directory with files to import
377  //
378  // NOTE:
379  // * files are moved into `temp_dir` in the exact order that they appear in
380  // `objects_to_process`
381  //
382  // * the `PATHNAME` option is set for `file_sort_order_by` in order to
383  // guarantee that import occurs in the order specified by user, provided the
384  // data wrapper correctly supports the `PATHNAME` option
385  //
386  // * filenames are chosen such that they appear in lexicographical order by
387  // pathname, thus require padding by appropriate number of zeros
388  //
389  // * filenames must maintain their suffixes for some features of data
390  // wrappers to work correctly, especially in the case of archived data (such
391  // as `.zip` or `.gz` or any variant.)
392  std::filesystem::path temp_dir_path{temp_dir};
393  size_t counter = 0;
394  size_t num_zero = get_number_of_digits(num_objects);
395  for (size_t i = begin_object_index; i < end_object_index; ++i) {
396  auto& object = objects_to_process[i];
397  std::filesystem::path old_path = object.download_file_path;
398  auto counter_str = std::to_string(counter++);
399  auto zero_padded_counter_str =
400  std::string(num_zero - counter_str.length(), '0') + counter_str;
401  auto new_path = (temp_dir_path / zero_padded_counter_str).string() +
402  std::filesystem::path{object.object_key}.extension().string();
403  std::filesystem::rename(old_path, new_path);
404  object.import_file_path = new_path;
405  }
406  return {temp_dir, local_copy_params};
407 }
408 #endif
409 
410 } // namespace
411 
412 namespace import_export {
413 
414 ForeignDataImporter::ForeignDataImporter(const std::string& copy_from_source,
415  const CopyParams& copy_params,
416  const TableDescriptor* table)
417  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
418  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
419 }
420 
422  const Catalog_Namespace::SessionInfo& parent_session_info,
423  ImportStatus& import_status,
424  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
425  string_dictionaries) {
426  if (table_->persistenceLevel ==
427  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
428  // tables
429  if (!import_status.load_failed) {
430  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
431  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
432  if (!string_dictionary->checkpoint()) {
433  LOG(ERROR) << "Checkpointing Dictionary for Column "
434  << column_desciptor->columnName << " failed.";
435  import_status.load_failed = true;
436  import_status.load_msg = "Dictionary checkpoint failed";
437  break;
438  }
439  }
440  }
441  }
442  if (import_status.load_failed) {
443  connector_->rollback(parent_session_info, table_->tableId);
444  } else {
445  connector_->checkpoint(parent_session_info, table_->tableId);
446  }
447 }
448 
450  const Catalog_Namespace::SessionInfo& parent_session_info,
451  ImportStatus& import_status,
452  const int32_t table_id) {
453  auto& catalog = parent_session_info.getCatalog();
454 
455  auto logical_columns =
456  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
457 
458  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
459  for (const auto& column_descriptor : logical_columns) {
460  if (!column_descriptor->columnType.is_dict_encoded_string()) {
461  continue;
462  }
463  auto dict_descriptor =
464  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
465  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
466  }
467 
468  finalize(parent_session_info, import_status, string_dictionaries);
469 }
470 
471 // This value is used only if it is non-zero; it is for testing purposes only
473 
474 namespace {
476  const size_t maximum_num_fragments_buffered,
477  const size_t max_import_batch_row_count,
478  const Catalog_Namespace::SessionInfo& parent_session_info,
479  const int32_t table_id) {
482  }
483 
484  if (max_import_batch_row_count != 0) {
485  return max_import_batch_row_count;
486  }
487 
488  // This number is chosen as a reasonable default value to reserve for
489  // intermediate buffering during import, it is about 2GB of memory. Note,
490  // depending on the acutal size of var len values, this heuristic target may
491  // be off. NOTE: `maximum_num_fragments_buffered` scales the allowed buffer
492  // size with the assumption that in the worst case all buffers may be
493  // buffered at once.
494  const size_t max_buffer_byte_size =
495  2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
496 
497  auto& catalog = parent_session_info.getCatalog();
498 
499  auto logical_columns =
500  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
501 
502  size_t row_byte_size = 0;
503  for (const auto& column_descriptor : logical_columns) {
504  auto type = column_descriptor->columnType;
505  size_t field_byte_length = 0;
506  if (type.is_varlen_indeed()) {
507  // use a heuristic where varlen types are assumed to be 256 bytes in length
508  field_byte_length = 256;
509  } else {
510  field_byte_length = type.get_size();
511  }
512  row_byte_size += field_byte_length;
513  }
514 
515  return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
517 }
518 } // namespace
519 
521  const Catalog_Namespace::SessionInfo* session_info,
522  const std::string& copy_from_source,
523  const CopyParams& copy_params) {
524  auto& catalog = session_info->getCatalog();
525 
527 
528  // validate copy params before import in order to print user friendly messages
529  validate_copy_params(copy_params);
530 
531  ImportStatus import_status;
532  {
533  auto& current_user = session_info->get_currentUser();
534  auto [server, user_mapping, foreign_table] =
536  copy_params,
537  catalog.getDatabaseId(),
538  table_,
539  current_user.userId);
540 
541  // maximum number of fragments buffered in memory at any one time, affects
542  // `maxFragRows` heuristic below
543  const size_t maximum_num_fragments_buffered = 3;
544  // set fragment size for proxy foreign table during import
545  foreign_table->maxFragRows =
546  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
547  copy_params.max_import_batch_row_count,
548  *session_info,
549  table_->tableId);
550 
551  // log for debugging purposes
552  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
553  << " for table " << table_->tableName;
554 
555  auto data_wrapper =
557  copy_params,
558  catalog.getDatabaseId(),
559  foreign_table.get(),
560  user_mapping.get());
561 
562  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
563  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
564  ChunkMetadataVector metadata_vector =
565  metadata_scan(data_wrapper.get(), foreign_table.get());
566  if (metadata_vector.empty()) { // an empty data source
567  return {};
568  }
569  max_fragment_id = 0;
570  for (const auto& [key, _] : metadata_vector) {
571  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
572  }
573  CHECK_GE(max_fragment_id, 0);
574  }
575 
576  import_status = import_foreign_data(max_fragment_id,
577  connector_.get(),
578  catalog,
579  table_,
580  data_wrapper.get(),
581  session_info,
582  copy_params,
583  copy_from_source,
584  maximum_num_fragments_buffered);
585 
586  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
587 
588  finalize(*session_info, import_status, table_->tableId);
589 
590  return import_status;
591 }
592 
594  const Catalog_Namespace::SessionInfo* session_info) {
595  return importGeneral(session_info, copy_from_source_, copy_params_);
596 }
597 
598 void ForeignDataImporter::setDefaultImportPath(const std::string& base_path) {
599  auto data_dir_path = boost::filesystem::canonical(base_path);
600  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
601 }
602 
604  const Catalog_Namespace::SessionInfo* session_info) {
606 
608 #if ENABLE_IMPORT_PARQUET
610 #endif
612  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
613  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
614 #if ENABLE_IMPORT_PARQUET
615  ", 'PARQUET_FILE'"
616 #endif
617  " or 'REGEX_PARSED_FILE'");
618  }
619 
624 
625 #ifdef HAVE_AWS_S3
626 
627  auto uuid = boost::uuids::random_generator()();
628  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
629  auto import_path = std::filesystem::path(default_import_path_) / base_path;
630 
631  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
641  import_path);
642  s3_archive->init_for_read();
643 
644  const auto bucket_name = s3_archive->url_part(4);
645 
646  auto object_keys = s3_archive->get_objkeys();
647  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
648  size_t object_count = 0;
649  for (const auto& objkey : object_keys) {
650  auto& object = objects_to_process[object_count++];
651  object.object_key = objkey;
652  object.is_downloaded = false;
653  }
654 
655  // Ensure files & dirs are cleaned up, regardless of outcome
656  ScopeGuard cleanup_guard = [&] {
657  if (std::filesystem::exists(import_path)) {
658  std::filesystem::remove_all(import_path);
659  }
660  };
661 
662  ImportStatus aggregate_import_status;
663  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
664 
665  std::mutex communication_mutex;
666  bool continue_downloading = true;
667  bool download_exception_occured = false;
668 
669  std::condition_variable files_download_condition;
670 
671  auto is_downloading_finished = [&] {
672  std::unique_lock communication_lock(communication_mutex);
673  return !continue_downloading || download_exception_occured;
674  };
675 
676  std::function<void(const std::vector<size_t>&)> download_objects =
677  [&](const std::vector<size_t>& partition) {
678  for (const auto& index : partition) {
679  DownloadedObjectToProcess& object = objects_to_process[index];
680  const std::string& obj_key = object.object_key;
681  if (is_downloading_finished()) {
682  return;
683  }
684  std::exception_ptr eptr; // unused
685  std::string local_file_path;
686  std::string exception_what;
687  bool exception_occured = false;
688 
689  try {
690  local_file_path = s3_archive->land(obj_key,
691  eptr,
692  false,
693  /*allow_named_pipe_use=*/false,
694  /*track_file_path=*/false);
695  } catch (const std::exception& e) {
696  exception_what = e.what();
697  exception_occured = true;
698  }
699 
700  if (is_downloading_finished()) {
701  return;
702  }
703  if (exception_occured) {
704  {
705  std::unique_lock communication_lock(communication_mutex);
706  download_exception_occured = true;
707  }
708  files_download_condition.notify_all();
709  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
710  "': " + exception_what);
711  }
712 
713  object.download_file_path = local_file_path;
714  object.is_downloaded =
715  true; // this variable is atomic and therefore acts as a lock, it must be
716  // set last to ensure no data race
717 
718  files_download_condition.notify_all();
719  }
720  };
721 
722  std::function<void()> import_local_files = [&]() {
723  for (size_t object_index = 0; object_index < object_count;) {
724  {
725  std::unique_lock communication_lock(communication_mutex);
726  files_download_condition.wait(
727  communication_lock,
728  [&download_exception_occured, object_index, &objects_to_process]() {
729  return objects_to_process[object_index].is_downloaded ||
730  download_exception_occured;
731  });
732  if (download_exception_occured) { // do not wait for object index if a download
733  // error has occured
734  return;
735  }
736  }
737 
738  // find largest range of files to import
739  size_t end_object_index = object_count;
740  for (size_t i = object_index + 1; i < object_count; ++i) {
741  if (!objects_to_process[i].is_downloaded) {
742  end_object_index = i;
743  break;
744  }
745  }
746 
747  ImportStatus local_import_status;
748  std::string local_import_dir;
749  try {
750  CopyParams local_copy_params;
751  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
752  copy_params_, objects_to_process, object_index, end_object_index);
753  local_import_status =
754  importGeneral(session_info, local_import_dir, local_copy_params);
755  // clean up temporary files
756  std::filesystem::remove_all(local_import_dir);
757  } catch (const std::exception& except) {
758  // replace all occurences of file names with the object keys for
759  // users
760  std::string what = except.what();
761 
762  for (size_t i = object_index; i < end_object_index; ++i) {
763  auto& object = objects_to_process[i];
764  what = boost::regex_replace(what,
765  boost::regex{object.import_file_path},
766  bucket_name + "/" + object.object_key);
767  }
768  {
769  std::unique_lock communication_lock(communication_mutex);
770  continue_downloading = false;
771  }
772  // clean up temporary files
773  std::filesystem::remove_all(local_import_dir);
774  throw std::runtime_error(what);
775  }
776  aggregate_import_status += local_import_status;
778  aggregate_import_status);
779  if (aggregate_import_status.load_failed) {
780  {
781  std::unique_lock communication_lock(communication_mutex);
782  continue_downloading = false;
783  }
784  return;
785  }
786 
787  object_index =
788  end_object_index; // all objects in range [object_index,end_object_index)
789  // correctly imported at this point in excecution, move onto
790  // next range
791  }
792  };
793 
794  std::vector<size_t> partition_range(object_count);
795  std::iota(partition_range.begin(), partition_range.end(), 0);
796  auto download_futures = foreign_storage::create_futures_for_workers(
797  partition_range, num_download_threads, download_objects);
798 
799  auto import_future = std::async(std::launch::async, import_local_files);
800 
801  for (auto& future : download_futures) {
802  future.wait();
803  }
804  import_future.get(); // may throw an exception
805 
806  // get any remaining exceptions
807  for (auto& future : download_futures) {
808  future.get();
809  }
810  return aggregate_import_status;
811 
812 #else
813  throw std::runtime_error("AWS S3 support not available");
814 
815  return {};
816 #endif
817 }
818 
819 #ifdef ENABLE_IMPORT_PARQUET
820 ImportStatus ForeignDataImporter::importParquet(
821  const Catalog_Namespace::SessionInfo* session_info) {
822  auto& catalog = session_info->getCatalog();
823 
825 
826  auto& current_user = session_info->get_currentUser();
828  catalog.getDatabaseId(), current_user.userId, copy_from_source_, copy_params_);
829 
830  auto user_mapping =
832  catalog.getDatabaseId(),
833  current_user.userId,
835  copy_params_,
836  server.get());
837 
838  auto foreign_table =
840  catalog.getDatabaseId(), table_, copy_from_source_, copy_params_, server.get());
841 
842  foreign_table->validateOptionValues();
843 
846  catalog.getDatabaseId(),
847  foreign_table.get(),
848  user_mapping.get());
849 
850  if (auto parquet_import =
851  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
853 
854  // determine the number of threads to use at each level
855 
856  int max_threads = 0;
857  if (copy_params_.threads == 0) {
858  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
860  } else {
861  max_threads = static_cast<size_t>(copy_params_.threads);
862  }
863  CHECK_GT(max_threads, 0);
864 
865  int num_importer_threads =
866  std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
867  parquet_import->setNumThreads(num_importer_threads);
868  int num_outer_thread = 1;
869  for (int thread_count = 1; thread_count <= max_threads; ++thread_count) {
870  if (thread_count * num_importer_threads <= max_threads) {
871  num_outer_thread = thread_count;
872  }
873  }
874 
875  std::shared_mutex import_status_mutex;
876  ImportStatus import_status; // manually update
877 
878  auto import_failed = [&import_status_mutex, &import_status] {
879  std::shared_lock import_status_lock(import_status_mutex);
880  return import_status.load_failed;
881  };
882 
883  std::vector<std::future<void>> futures;
884 
885  for (int ithread = 0; ithread < num_outer_thread; ++ithread) {
886  futures.emplace_back(std::async(std::launch::async, [&] {
887  while (true) {
888  auto batch_result = parquet_import->getNextImportBatch();
889  if (import_failed()) {
890  break;
891  }
892  auto batch = batch_result->getInsertData();
893  if (!batch || import_failed()) {
894  break;
895  }
896  insert_data_loader.insertData(*session_info, *batch);
897 
898  auto batch_import_status = batch_result->getImportStatus();
899  {
900  std::unique_lock import_status_lock(import_status_mutex);
901  import_status.rows_completed += batch_import_status.rows_completed;
902  import_status.rows_rejected += batch_import_status.rows_rejected;
903  if (import_status.rows_rejected > copy_params_.max_reject) {
904  import_status.load_failed = true;
905  import_status.load_msg =
906  "Load was cancelled due to max reject rows being reached";
907  break;
908  }
909  }
910  }
911  }));
912  }
913 
914  for (auto& future : futures) {
915  future.wait();
916  }
917 
918  for (auto& future : futures) {
919  future.get();
920  }
921 
922  if (import_status.load_failed) {
923  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
924  // the rollback in the checkpoint below
925  }
926 
927  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
928 
929  return import_status;
930  }
931 
932  UNREACHABLE();
933  return {};
934 }
935 #endif
936 
938  const Catalog_Namespace::SessionInfo* session_info) {
939  if (foreign_storage::is_s3_uri(copy_from_source_)) {
940  return importGeneralS3(session_info);
941  }
942  return importGeneral(session_info);
943 }
944 
945 } // namespace import_export
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
Definition: CopyParams.h:62
std::vector< int > ChunkKey
Definition: types.h:36
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::string tableName
std::string get_import_id(const import_export::CopyParams &copy_params, const std::string &copy_from_source)
bool g_enable_legacy_delimited_import
Definition: ParserNode.cpp:81
#define LOG(tag)
Definition: Logger.h:283
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void validate_sort_options(const FilePathOptions &options)
#define UNREACHABLE()
Definition: Logger.h:333
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
#define CHECK_GE(x, y)
Definition: Logger.h:302
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
#define CHECK_GT(x, y)
Definition: Logger.h:301
std::string to_string(char const *&&v)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:304
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
bool is_s3_uri(const std::string &file_path)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
#define CHECK_LT(x, y)
Definition: Logger.h:299
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK_LE(x, y)
Definition: Logger.h:300
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
bool g_enable_fsi_regex_import
Definition: ParserNode.cpp:85
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2254
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: CopyParams.h:63
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:289
#define DEBUG_TIMER(name)
Definition: Logger.h:407
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
size_t g_max_import_threads
Definition: Importer.cpp:106
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)