OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
18 
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
21 #include <filesystem>
22 
23 #include "Archive/S3Archive.h"
28 #include "Importer.h"
29 #include "Parser/ParserNode.h"
31 #include "Shared/measure.h"
32 #include "Shared/misc.h"
33 #include "Shared/scope.h"
34 #include "UserMapping.h"
35 
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
39 #endif
40 extern bool g_enable_fsi_regex_import;
41 
42 namespace {
44  foreign_storage::ForeignTable* foreign_table) {
45  ChunkMetadataVector metadata_vector;
46  try {
47  data_wrapper->populateChunkMetadata(
48  metadata_vector); // explicitly invoke a metadata scan on data wrapper
50  metadata_scan_exception) {
51  // if a metadata scan exception is thrown, check to see if we can adjust
52  // the fragment size and retry
53 
54  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
55  if (min_feasible_fragment_size < 0) {
56  throw; // no valid fragment size returned by exception
57  }
58  foreign_table->maxFragRows = min_feasible_fragment_size;
59  data_wrapper->populateChunkMetadata(
60  metadata_vector); // attempt another metadata scan, note, we assume that the
61  // metadata scan can be reentered safely after throwing the
62  // exception
63  }
64  return metadata_vector;
65 }
66 
70  }
71 }
72 
74  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
77  std::unique_ptr<foreign_storage::ForeignStorageBuffer> delete_buffer;
78 };
79 
80 std::unique_ptr<FragmentBuffers> create_fragment_buffers(
81  const int32_t fragment_id,
83  const TableDescriptor* table) {
84  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
85 
86  std::set<ChunkKey> fragment_keys;
87  for (const auto col_desc : columns) {
88  ChunkKey key{
89  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
90  if (col_desc->columnType.is_varlen_indeed()) {
91  auto data_key = key;
92  data_key.push_back(1);
93  fragment_keys.insert(data_key);
94  auto index_key = key;
95  index_key.push_back(2);
96  fragment_keys.insert(index_key);
97  } else {
98  fragment_keys.insert(key);
99  }
100  }
101 
102  // create buffers
103  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
104  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
105  for (const auto& key : fragment_keys) {
106  frag_buffers->fragment_buffers_owner[key] =
107  std::make_unique<foreign_storage::ForeignStorageBuffer>();
108  frag_buffers->fragment_buffers[key] =
109  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
110  }
111 
112  return frag_buffers;
113 }
114 
118  const TableDescriptor* table,
119  const Catalog_Namespace::SessionInfo* session_info,
120  const import_export::CopyParams& copy_params,
121  const std::string& copy_from_source,
122  import_export::ImportStatus& import_status,
123  std::mutex& communication_mutex,
124  bool& continue_loading,
125  bool& load_failed,
126  bool& data_wrapper_error_occured,
127  std::condition_variable& buffers_to_load_condition,
128  std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
129  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
130  while (true) {
131  {
132  std::unique_lock communication_lock(communication_mutex);
133  buffers_to_load_condition.wait(communication_lock, [&]() {
134  return !buffers_to_load.empty() || !continue_loading ||
135  data_wrapper_error_occured;
136  });
137  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
138  return;
139  }
140  }
141 
142  CHECK(!buffers_to_load.empty());
143 
144  try {
145  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
146  {
147  std::unique_lock communication_lock(communication_mutex);
148  grouped_fragment_buffers.reset(buffers_to_load.front().release());
149  buffers_to_load.pop_front();
150  buffers_to_load_condition.notify_all();
151  }
152  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
153  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
154 
155  // get chunks for import
157  table->tableId, catalog.getDatabaseId(), {}, {}};
158 
159  // create chunks from buffers
160  for (const auto& [key, buffer] : fragment_buffers) {
161  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
162  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
163  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
164 
165  if (col_desc->columnType.is_varlen_indeed()) {
166  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
167  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
168  auto index_key = key;
169  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
170  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
171  col_desc,
172  buffer,
173  shared::get_from_map(fragment_buffers, index_key),
174  false);
175  }
176  } else { // regular non-varlen case with no index buffer
177  insert_chunks.chunks[col_id] =
178  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
179  }
180  }
181 
182  // mark which row indices are valid for import
183  auto row_count = fragment_buffers.begin()
184  ->second->getEncoder()
185  ->getNumElems(); // assume all chunks have same number of
186  // rows, this is validated at a lower level
187  insert_chunks.valid_row_indices.reserve(row_count);
188  for (size_t irow = 0; irow < row_count; ++irow) {
189  if (delete_buffer->size() > 0) {
190  CHECK_LE(irow, delete_buffer->size());
191  if (delete_buffer->getMemoryPtr()[irow]) {
192  continue;
193  }
194  }
195  insert_chunks.valid_row_indices.emplace_back(irow);
196  }
197 
198  // import chunks
199  insert_data_loader.insertChunks(*session_info, insert_chunks);
200 
201  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
202  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
203  import_status.rows_completed += insert_chunks.valid_row_indices.size();
204  if (import_status.rows_rejected > copy_params.max_reject) {
205  import_status.load_failed = true;
206  import_status.load_msg =
207  "Load was cancelled due to max reject rows being reached";
208  import_export::Importer::set_import_status(copy_from_source, import_status);
209  std::unique_lock communication_lock(communication_mutex);
210  load_failed = true;
211  buffers_to_load_condition.notify_all();
212  return;
213  }
214  import_export::Importer::set_import_status(copy_from_source, import_status);
215  } catch (...) {
216  {
217  std::unique_lock communication_lock(communication_mutex);
218  load_failed = true;
219  buffers_to_load_condition.notify_all();
220  }
221  throw;
222  }
223  }
224 }
225 
227  const int32_t max_fragment_id,
230  const TableDescriptor* table,
232  const Catalog_Namespace::SessionInfo* session_info,
233  const import_export::CopyParams& copy_params,
234  const std::string& copy_from_source,
235  const size_t maximum_num_fragments_buffered) {
237  // if render group assignment is enabled, tell the wrapper to create any
238  // RenderGroupAnalyzers it may need for any poly columns in the table, if
239  // that wrapper type supports it
240  data_wrapper->createRenderGroupAnalyzers();
241  }
242 
243  import_export::ImportStatus import_status;
244 
245  std::mutex communication_mutex;
246  bool continue_loading =
247  true; // when false, signals that the last buffer to load has been added, and
248  // loading should cease after loading remaining buffers
249  bool load_failed =
250  false; // signals loading has failed and buffer population should cease
251  bool data_wrapper_error_occured = false; // signals an error occured during buffer
252  // population and loading should cease
253  std::condition_variable buffers_to_load_condition;
254  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
255 
256  // launch separate thread to load processed fragment buffers
257  auto load_future = std::async(std::launch::async,
259  connector,
260  std::ref(catalog),
261  table,
262  session_info,
263  std::cref(copy_params),
264  std::cref(copy_from_source),
265  std::ref(import_status),
266  std::ref(communication_mutex),
267  std::ref(continue_loading),
268  std::ref(load_failed),
269  std::ref(data_wrapper_error_occured),
270  std::ref(buffers_to_load_condition),
271  std::ref(buffers_to_load));
272 
273  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
274  {
275  std::unique_lock communication_lock(communication_mutex);
276  buffers_to_load_condition.wait(communication_lock, [&]() {
277  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
278  });
279  if (load_failed) {
280  break;
281  }
282  }
283  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
284  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
285  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
286 
287  // get the buffers, accounting for the possibility of the requested fragment id being
288  // out of bounds
289  try {
290  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
292  break;
293  } catch (...) {
294  std::unique_lock communication_lock(communication_mutex);
295  data_wrapper_error_occured = true;
296  buffers_to_load_condition.notify_all();
297  throw;
298  }
299 
300  std::unique_lock communication_lock(communication_mutex);
301  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
302  buffers_to_load_condition.notify_all();
303  }
304 
305  { // data wrapper processing has finished, notify loading thread
306  std::unique_lock communication_lock(communication_mutex);
307  continue_loading = false;
308  buffers_to_load_condition.notify_all();
309  }
310 
311  // any exceptions in separate loading thread will occur here
312  load_future.get();
313 
314  return import_status;
315 }
316 
317 #ifdef HAVE_AWS_S3
318 struct DownloadedObjectToProcess {
319  std::string object_key;
320  std::atomic<bool> is_downloaded;
321  std::string download_file_path;
322  std::string import_file_path;
323 };
324 
325 size_t get_number_of_digits(const size_t number) {
326  return std::to_string(number).length();
327 }
328 
329 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
330  const import_export::CopyParams& s3_copy_params,
331  std::vector<DownloadedObjectToProcess>& objects_to_process,
332  const size_t begin_object_index,
333  const size_t end_object_index) {
334  import_export::CopyParams local_copy_params = s3_copy_params;
335  // remove any members from `local_copy_params` that are only intended to be used at a
336  // higher level
337  local_copy_params.s3_access_key.clear();
338  local_copy_params.s3_secret_key.clear();
339  local_copy_params.s3_session_token.clear();
340  local_copy_params.s3_region.clear();
341  local_copy_params.s3_endpoint.clear();
342 
343  local_copy_params.regex_path_filter = std::nullopt;
344  local_copy_params.file_sort_order_by = "PATHNAME"; // see comment below
345  local_copy_params.file_sort_regex = std::nullopt;
346 
347  CHECK_GT(end_object_index, begin_object_index);
348  CHECK_LT(begin_object_index, objects_to_process.size());
349 
350  size_t num_objects = end_object_index - begin_object_index;
351  auto& first_object = objects_to_process[begin_object_index];
352  std::string first_path = first_object.download_file_path;
353  std::string temp_dir = first_path + "_import";
354 
355  if (!std::filesystem::create_directory(temp_dir)) {
356  throw std::runtime_error("failed to create temporary directory for import: " +
357  temp_dir);
358  }
359 
360  // construct a directory with files to import
361  //
362  // NOTE:
363  // * files are moved into `temp_dir` in the exact order that they appear in
364  // `objects_to_process`
365  //
366  // * the `PATHNAME` option is set for `file_sort_order_by` in order to
367  // guarantee that import occurs in the order specified by user, provided the
368  // data wrapper correctly supports the `PATHNAME` option
369  //
370  // * filenames are chosen such that they appear in lexicographical order by
371  // pathname, thus require padding by appropriate number of zeros
372  //
373  // * filenames must maintain their suffixes for some features of data
374  // wrappers to work correctly, especially in the case of archived data (such
375  // as `.zip` or `.gz` or any variant.)
376  std::filesystem::path temp_dir_path{temp_dir};
377  size_t counter = 0;
378  size_t num_zero = get_number_of_digits(num_objects);
379  for (size_t i = begin_object_index; i < end_object_index; ++i) {
380  auto& object = objects_to_process[i];
381  std::filesystem::path old_path = object.download_file_path;
382  auto counter_str = std::to_string(counter++);
383  auto zero_padded_counter_str =
384  std::string(num_zero - counter_str.length(), '0') + counter_str;
385  auto new_path = (temp_dir_path / zero_padded_counter_str).string() +
386  std::filesystem::path{object.object_key}.extension().string();
387  std::filesystem::rename(old_path, new_path);
388  object.import_file_path = new_path;
389  }
390  return {temp_dir, local_copy_params};
391 }
392 #endif
393 
394 } // namespace
395 
396 namespace import_export {
397 
398 ForeignDataImporter::ForeignDataImporter(const std::string& copy_from_source,
399  const CopyParams& copy_params,
400  const TableDescriptor* table)
401  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
402  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
403 }
404 
406  const Catalog_Namespace::SessionInfo& parent_session_info,
407  ImportStatus& import_status,
408  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
409  string_dictionaries) {
410  if (table_->persistenceLevel ==
411  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
412  // tables
413  if (!import_status.load_failed) {
414  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
415  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
416  if (!string_dictionary->checkpoint()) {
417  LOG(ERROR) << "Checkpointing Dictionary for Column "
418  << column_desciptor->columnName << " failed.";
419  import_status.load_failed = true;
420  import_status.load_msg = "Dictionary checkpoint failed";
421  break;
422  }
423  }
424  }
425  }
426  if (import_status.load_failed) {
427  connector_->rollback(parent_session_info, table_->tableId);
428  } else {
429  connector_->checkpoint(parent_session_info, table_->tableId);
430  }
431 }
432 
434  const Catalog_Namespace::SessionInfo& parent_session_info,
435  ImportStatus& import_status,
436  const int32_t table_id) {
437  auto& catalog = parent_session_info.getCatalog();
438 
439  auto logical_columns =
440  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
441 
442  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
443  for (const auto& column_descriptor : logical_columns) {
444  if (!column_descriptor->columnType.is_dict_encoded_string()) {
445  continue;
446  }
447  auto dict_descriptor =
448  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
449  string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
450  }
451 
452  finalize(parent_session_info, import_status, string_dictionaries);
453 }
454 
455 // This value is used only if it is non-zero; it is for testing purposes only
457 
458 namespace {
460  const size_t maximum_num_fragments_buffered,
461  const size_t max_import_batch_row_count,
462  const Catalog_Namespace::SessionInfo& parent_session_info,
463  const int32_t table_id) {
466  }
467 
468  if (max_import_batch_row_count != 0) {
469  return max_import_batch_row_count;
470  }
471 
472  // This number is chosen as a reasonable default value to reserve for
473  // intermediate buffering during import, it is about 2GB of memory. Note,
474  // depending on the acutal size of var len values, this heuristic target may
475  // be off. NOTE: `maximum_num_fragments_buffered` scales the allowed buffer
476  // size with the assumption that in the worst case all buffers may be
477  // buffered at once.
478  const size_t max_buffer_byte_size =
479  2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
480 
481  auto& catalog = parent_session_info.getCatalog();
482 
483  auto logical_columns =
484  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
485 
486  size_t row_byte_size = 0;
487  for (const auto& column_descriptor : logical_columns) {
488  auto type = column_descriptor->columnType;
489  size_t field_byte_length = 0;
490  if (type.is_varlen_indeed()) {
491  // use a heuristic of 25% of the maximum string length size, which is likely
492  // conservative
493  field_byte_length = std::max<size_t>(StringDictionary::MAX_STRLEN / 4, 1);
494  } else {
495  field_byte_length = type.get_size();
496  }
497  row_byte_size += field_byte_length;
498  }
499 
500  return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
502 }
503 } // namespace
504 
506  const Catalog_Namespace::SessionInfo* session_info,
507  const std::string& copy_from_source,
508  const CopyParams& copy_params) {
509  auto& catalog = session_info->getCatalog();
510 
512 
513  // validate copy params before import in order to print user friendly messages
514  validate_copy_params(copy_params);
515 
516  ImportStatus import_status;
517  {
518  auto& current_user = session_info->get_currentUser();
519  auto [server, user_mapping, foreign_table] =
521  copy_params,
522  catalog.getDatabaseId(),
523  table_,
524  current_user.userId);
525 
526  // maximum number of fragments buffered in memory at any one time, affects
527  // `maxFragRows` heuristic below
528  const size_t maximum_num_fragments_buffered = 3;
529  // set fragment size for proxy foreign table during import
530  foreign_table->maxFragRows =
531  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
532  copy_params.max_import_batch_row_count,
533  *session_info,
534  table_->tableId);
535 
536  // log for debugging purposes
537  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
538  << " for table " << table_->tableName;
539 
540  auto data_wrapper =
542  copy_params,
543  catalog.getDatabaseId(),
544  foreign_table.get(),
545  user_mapping.get());
546 
547  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
548  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
549  ChunkMetadataVector metadata_vector =
550  metadata_scan(data_wrapper.get(), foreign_table.get());
551  if (metadata_vector.empty()) { // an empty data source
552  return {};
553  }
554  max_fragment_id = 0;
555  for (const auto& [key, _] : metadata_vector) {
556  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
557  }
558  CHECK_GE(max_fragment_id, 0);
559  }
560 
561  import_status = import_foreign_data(max_fragment_id,
562  connector_.get(),
563  catalog,
564  table_,
565  data_wrapper.get(),
566  session_info,
567  copy_params,
568  copy_from_source,
569  maximum_num_fragments_buffered);
570 
571  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
572 
573  finalize(*session_info, import_status, table_->tableId);
574 
575  return import_status;
576 }
577 
579  const Catalog_Namespace::SessionInfo* session_info) {
580  return importGeneral(session_info, copy_from_source_, copy_params_);
581 }
582 
583 void ForeignDataImporter::setDefaultImportPath(const std::string& base_path) {
584  auto data_dir_path = boost::filesystem::canonical(base_path);
585  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
586 }
587 
589  const Catalog_Namespace::SessionInfo* session_info) {
591 
593 #if ENABLE_IMPORT_PARQUET
595 #endif
597  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
598  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
599 #if ENABLE_IMPORT_PARQUET
600  ", 'PARQUET_FILE'"
601 #endif
602  " or 'REGEX_PARSED_FILE'");
603  }
604 
609 
610 #ifdef HAVE_AWS_S3
611 
612  auto uuid = boost::uuids::random_generator()();
613  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
614  auto import_path = std::filesystem::path(default_import_path_) / base_path;
615 
616  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
626  import_path);
627  s3_archive->init_for_read();
628 
629  const auto bucket_name = s3_archive->url_part(4);
630 
631  auto object_keys = s3_archive->get_objkeys();
632  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
633  size_t object_count = 0;
634  for (const auto& objkey : object_keys) {
635  auto& object = objects_to_process[object_count++];
636  object.object_key = objkey;
637  object.is_downloaded = false;
638  }
639 
640  // Ensure files & dirs are cleaned up, regardless of outcome
641  ScopeGuard cleanup_guard = [&] {
642  if (std::filesystem::exists(import_path)) {
643  std::filesystem::remove_all(import_path);
644  }
645  };
646 
647  ImportStatus aggregate_import_status;
648  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
649 
650  std::mutex communication_mutex;
651  bool continue_downloading = true;
652  bool download_exception_occured = false;
653 
654  std::condition_variable files_download_condition;
655 
656  auto is_downloading_finished = [&] {
657  std::unique_lock communication_lock(communication_mutex);
658  return !continue_downloading || download_exception_occured;
659  };
660 
661  std::function<void(const std::vector<size_t>&)> download_objects =
662  [&](const std::vector<size_t>& partition) {
663  for (const auto& index : partition) {
664  DownloadedObjectToProcess& object = objects_to_process[index];
665  const std::string& obj_key = object.object_key;
666  if (is_downloading_finished()) {
667  return;
668  }
669  std::exception_ptr eptr; // unused
670  std::string local_file_path;
671  std::string exception_what;
672  bool exception_occured = false;
673 
674  try {
675  local_file_path = s3_archive->land(obj_key,
676  eptr,
677  false,
678  /*allow_named_pipe_use=*/false,
679  /*track_file_path=*/false);
680  } catch (const std::exception& e) {
681  exception_what = e.what();
682  exception_occured = true;
683  }
684 
685  if (is_downloading_finished()) {
686  return;
687  }
688  if (exception_occured) {
689  {
690  std::unique_lock communication_lock(communication_mutex);
691  download_exception_occured = true;
692  }
693  files_download_condition.notify_all();
694  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
695  "': " + exception_what);
696  }
697 
698  object.download_file_path = local_file_path;
699  object.is_downloaded =
700  true; // this variable is atomic and therefore acts as a lock, it must be
701  // set last to ensure no data race
702 
703  files_download_condition.notify_all();
704  }
705  };
706 
707  std::function<void()> import_local_files = [&]() {
708  for (size_t object_index = 0; object_index < object_count;) {
709  {
710  std::unique_lock communication_lock(communication_mutex);
711  files_download_condition.wait(
712  communication_lock,
713  [&download_exception_occured, object_index, &objects_to_process]() {
714  return objects_to_process[object_index].is_downloaded ||
715  download_exception_occured;
716  });
717  if (download_exception_occured) { // do not wait for object index if a download
718  // error has occured
719  return;
720  }
721  }
722 
723  // find largest range of files to import
724  size_t end_object_index = object_count;
725  for (size_t i = object_index + 1; i < object_count; ++i) {
726  if (!objects_to_process[i].is_downloaded) {
727  end_object_index = i;
728  break;
729  }
730  }
731 
732  ImportStatus local_import_status;
733  std::string local_import_dir;
734  try {
735  CopyParams local_copy_params;
736  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
737  copy_params_, objects_to_process, object_index, end_object_index);
738  local_import_status =
739  importGeneral(session_info, local_import_dir, local_copy_params);
740  // clean up temporary files
741  std::filesystem::remove_all(local_import_dir);
742  } catch (const std::exception& except) {
743  // replace all occurences of file names with the object keys for
744  // users
745  std::string what = except.what();
746 
747  for (size_t i = object_index; i < end_object_index; ++i) {
748  auto& object = objects_to_process[i];
749  what = boost::regex_replace(what,
750  boost::regex{object.import_file_path},
751  bucket_name + "/" + object.object_key);
752  }
753  {
754  std::unique_lock communication_lock(communication_mutex);
755  continue_downloading = false;
756  }
757  // clean up temporary files
758  std::filesystem::remove_all(local_import_dir);
759  throw std::runtime_error(what);
760  }
761  aggregate_import_status += local_import_status;
763  aggregate_import_status);
764  if (aggregate_import_status.load_failed) {
765  {
766  std::unique_lock communication_lock(communication_mutex);
767  continue_downloading = false;
768  }
769  return;
770  }
771 
772  object_index =
773  end_object_index; // all objects in range [object_index,end_object_index)
774  // correctly imported at this point in excecution, move onto
775  // next range
776  }
777  };
778 
779  std::vector<size_t> partition_range(object_count);
780  std::iota(partition_range.begin(), partition_range.end(), 0);
781  auto download_futures = foreign_storage::create_futures_for_workers(
782  partition_range, num_download_threads, download_objects);
783 
784  auto import_future = std::async(std::launch::async, import_local_files);
785 
786  for (auto& future : download_futures) {
787  future.wait();
788  }
789  import_future.get(); // may throw an exception
790 
791  // get any remaining exceptions
792  for (auto& future : download_futures) {
793  future.get();
794  }
795  return aggregate_import_status;
796 
797 #else
798  throw std::runtime_error("AWS S3 support not available");
799 
800  return {};
801 #endif
802 }
803 
804 #ifdef ENABLE_IMPORT_PARQUET
805 ImportStatus ForeignDataImporter::importParquet(
806  const Catalog_Namespace::SessionInfo* session_info) {
807  auto& catalog = session_info->getCatalog();
808 
810 
811  auto& current_user = session_info->get_currentUser();
813  catalog.getDatabaseId(), current_user.userId, copy_from_source_, copy_params_);
814 
815  auto user_mapping =
817  catalog.getDatabaseId(),
818  current_user.userId,
820  copy_params_,
821  server.get());
822 
823  auto foreign_table =
825  catalog.getDatabaseId(), table_, copy_from_source_, copy_params_, server.get());
826 
827  foreign_table->validateOptionValues();
828 
831  catalog.getDatabaseId(),
832  foreign_table.get(),
833  user_mapping.get());
834 
835  if (auto parquet_import =
836  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
838 
839  // determine the number of threads to use at each level
840 
841  int max_threads = 0;
842  if (copy_params_.threads == 0) {
843  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
845  } else {
846  max_threads = static_cast<size_t>(copy_params_.threads);
847  }
848  CHECK_GT(max_threads, 0);
849 
850  int num_importer_threads =
851  std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
852  parquet_import->setNumThreads(num_importer_threads);
853  int num_outer_thread = 1;
854  for (int thread_count = 1; thread_count <= max_threads; ++thread_count) {
855  if (thread_count * num_importer_threads <= max_threads) {
856  num_outer_thread = thread_count;
857  }
858  }
859 
860  std::shared_mutex import_status_mutex;
861  ImportStatus import_status; // manually update
862 
863  auto import_failed = [&import_status_mutex, &import_status] {
864  std::shared_lock import_status_lock(import_status_mutex);
865  return import_status.load_failed;
866  };
867 
868  std::vector<std::future<void>> futures;
869 
870  for (int ithread = 0; ithread < num_outer_thread; ++ithread) {
871  futures.emplace_back(std::async(std::launch::async, [&] {
872  while (true) {
873  auto batch_result = parquet_import->getNextImportBatch();
874  if (import_failed()) {
875  break;
876  }
877  auto batch = batch_result->getInsertData();
878  if (!batch || import_failed()) {
879  break;
880  }
881  insert_data_loader.insertData(*session_info, *batch);
882 
883  auto batch_import_status = batch_result->getImportStatus();
884  {
885  std::unique_lock import_status_lock(import_status_mutex);
886  import_status.rows_completed += batch_import_status.rows_completed;
887  import_status.rows_rejected += batch_import_status.rows_rejected;
888  if (import_status.rows_rejected > copy_params_.max_reject) {
889  import_status.load_failed = true;
890  import_status.load_msg =
891  "Load was cancelled due to max reject rows being reached";
892  break;
893  }
894  }
895  }
896  }));
897  }
898 
899  for (auto& future : futures) {
900  future.wait();
901  }
902 
903  for (auto& future : futures) {
904  future.get();
905  }
906 
907  if (import_status.load_failed) {
908  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
909  // the rollback in the checkpoint below
910  }
911 
912  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
913 
914  return import_status;
915  }
916 
917  UNREACHABLE();
918  return {};
919 }
920 #endif
921 
923  const Catalog_Namespace::SessionInfo* session_info) {
924  if (foreign_storage::is_s3_uri(copy_from_source_)) {
925  return importGeneralS3(session_info);
926  }
927  return importGeneral(session_info);
928 }
929 
930 } // namespace import_export
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
Definition: CopyParams.h:62
std::vector< int > ChunkKey
Definition: types.h:36
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::string tableName
bool g_enable_legacy_delimited_import
Definition: ParserNode.cpp:81
#define LOG(tag)
Definition: Logger.h:216
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void validate_sort_options(const FilePathOptions &options)
#define UNREACHABLE()
Definition: Logger.h:266
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
#define CHECK_GE(x, y)
Definition: Logger.h:235
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:298
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
bool is_s3_uri(const std::string &file_path)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:235
#define CHECK_LT(x, y)
Definition: Logger.h:232
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK_LE(x, y)
Definition: Logger.h:233
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
bool g_enable_fsi_regex_import
Definition: ParserNode.cpp:85
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2228
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: CopyParams.h:63
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
static constexpr size_t MAX_STRLEN
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
size_t g_max_import_threads
Definition: Importer.cpp:106
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)