19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
38 #ifdef ENABLE_IMPORT_PARQUET
39 extern bool g_enable_legacy_parquet_import;
51 metadata_scan_exception) {
56 if (min_feasible_fragment_size < 0) {
59 foreign_table->
maxFragRows = min_feasible_fragment_size;
65 return metadata_vector;
69 const std::string& copy_from_source) {
72 #ifdef ENABLE_IMPORT_PARQUET
76 return boost::filesystem::path(copy_from_source).filename().string();
79 return copy_from_source;
89 std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
96 const int32_t fragment_id,
101 std::set<ChunkKey> fragment_keys;
102 for (
const auto col_desc : columns) {
105 if (col_desc->columnType.is_varlen_indeed()) {
107 data_key.push_back(1);
108 fragment_keys.insert(data_key);
109 auto index_key = key;
110 index_key.push_back(2);
111 fragment_keys.insert(index_key);
113 fragment_keys.insert(key);
118 std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
119 frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
120 for (
const auto& key : fragment_keys) {
121 frag_buffers->fragment_buffers_owner[key] =
122 std::make_unique<foreign_storage::ForeignStorageBuffer>();
123 frag_buffers->fragment_buffers[key] =
136 const std::string& copy_from_source,
138 std::mutex& communication_mutex,
139 bool& continue_loading,
141 bool& data_wrapper_error_occured,
142 std::condition_variable& buffers_to_load_condition,
143 std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
148 buffers_to_load_condition.wait(communication_lock, [&]() {
149 return !buffers_to_load.empty() || !continue_loading ||
150 data_wrapper_error_occured;
152 if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
157 CHECK(!buffers_to_load.empty());
160 std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
163 grouped_fragment_buffers.reset(buffers_to_load.front().release());
164 buffers_to_load.pop_front();
165 buffers_to_load_condition.notify_all();
167 auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
168 auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
175 for (
const auto& [key, buffer] : fragment_buffers) {
180 if (col_desc->columnType.is_varlen_indeed()) {
183 auto index_key = key;
192 insert_chunks.chunks[col_id] =
198 auto row_count = fragment_buffers.begin()
199 ->second->getEncoder()
202 insert_chunks.valid_row_indices.reserve(row_count);
203 for (
size_t irow = 0; irow < row_count; ++irow) {
204 if (delete_buffer->size() > 0) {
205 CHECK_LE(irow, delete_buffer->size());
206 if (delete_buffer->getMemoryPtr()[irow]) {
210 insert_chunks.valid_row_indices.emplace_back(irow);
214 insert_data_loader.
insertChunks(*session_info, insert_chunks);
216 CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
217 import_status.
rows_rejected += row_count - insert_chunks.valid_row_indices.size();
218 import_status.
rows_completed += insert_chunks.valid_row_indices.size();
222 "Load was cancelled due to max reject rows being reached";
224 get_import_id(copy_params, copy_from_source), import_status);
227 buffers_to_load_condition.notify_all();
231 get_import_id(copy_params, copy_from_source), import_status);
236 buffers_to_load_condition.notify_all();
244 const int32_t max_fragment_id,
251 const std::string& copy_from_source,
252 const size_t maximum_num_fragments_buffered) {
262 std::mutex communication_mutex;
263 bool continue_loading =
268 bool data_wrapper_error_occured =
false;
270 std::condition_variable buffers_to_load_condition;
271 std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
280 std::cref(copy_params),
281 std::cref(copy_from_source),
282 std::ref(import_status),
283 std::ref(communication_mutex),
284 std::ref(continue_loading),
285 std::ref(load_failed),
286 std::ref(data_wrapper_error_occured),
287 std::ref(buffers_to_load_condition),
288 std::ref(buffers_to_load));
290 for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
293 buffers_to_load_condition.wait(communication_lock, [&]() {
294 return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
301 auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
302 auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
312 data_wrapper_error_occured =
true;
313 buffers_to_load_condition.notify_all();
318 buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
319 buffers_to_load_condition.notify_all();
324 continue_loading =
false;
325 buffers_to_load_condition.notify_all();
331 return import_status;
335 struct DownloadedObjectToProcess {
336 std::string object_key;
337 std::atomic<bool> is_downloaded;
338 std::string download_file_path;
339 std::string import_file_path;
342 size_t get_number_of_digits(
const size_t number) {
346 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
348 std::vector<DownloadedObjectToProcess>& objects_to_process,
349 const size_t begin_object_index,
350 const size_t end_object_index) {
364 CHECK_GT(end_object_index, begin_object_index);
365 CHECK_LT(begin_object_index, objects_to_process.size());
367 size_t num_objects = end_object_index - begin_object_index;
368 auto& first_object = objects_to_process[begin_object_index];
369 std::string first_path = first_object.download_file_path;
370 std::string temp_dir = first_path +
"_import";
372 if (!std::filesystem::create_directory(temp_dir)) {
373 throw std::runtime_error(
"failed to create temporary directory for import: " +
393 std::filesystem::path temp_dir_path{temp_dir};
395 size_t num_zero = get_number_of_digits(num_objects);
396 for (
size_t i = begin_object_index; i < end_object_index; ++i) {
397 auto&
object = objects_to_process[i];
398 std::filesystem::path old_path =
object.download_file_path;
400 auto zero_padded_counter_str =
401 std::string(num_zero - counter_str.length(),
'0') + counter_str;
402 auto new_path = (temp_dir_path / zero_padded_counter_str).
string() +
403 std::filesystem::path{
object.object_key}.extension().string();
404 std::filesystem::rename(old_path, new_path);
405 object.import_file_path = new_path;
407 return {temp_dir, local_copy_params};
413 namespace import_export {
418 : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
419 connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
425 const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
426 string_dictionaries) {
431 auto timer =
DEBUG_TIMER(
"Dictionary Checkpointing");
432 for (
const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
433 if (!string_dictionary->checkpoint()) {
434 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
435 << column_desciptor->columnName <<
" failed.";
437 import_status.
load_msg =
"Dictionary checkpoint failed";
453 const int32_t table_id) {
454 auto& catalog = parent_session_info.
getCatalog();
456 auto logical_columns =
459 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
460 for (
const auto& column_descriptor : logical_columns) {
461 if (!column_descriptor->columnType.is_dict_encoded_string()) {
464 auto dict_descriptor =
465 catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(),
true);
466 string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
469 finalize(parent_session_info, import_status, string_dictionaries);
477 const size_t maximum_num_fragments_buffered,
478 const size_t max_import_batch_row_count,
480 const int32_t table_id) {
485 if (max_import_batch_row_count != 0) {
486 return max_import_batch_row_count;
495 const size_t max_buffer_byte_size =
496 2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
498 auto& catalog = parent_session_info.
getCatalog();
500 auto logical_columns =
503 size_t row_byte_size = 0;
504 for (
const auto& column_descriptor : logical_columns) {
505 auto type = column_descriptor->columnType;
506 size_t field_byte_length = 0;
507 if (
type.is_varlen_indeed()) {
509 field_byte_length = 256;
511 field_byte_length =
type.get_size();
513 row_byte_size += field_byte_length;
516 return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
523 const std::string& copy_from_source,
535 auto [server, user_mapping, foreign_table] =
538 catalog.getDatabaseId(),
544 const size_t maximum_num_fragments_buffered = 3;
546 foreign_table->maxFragRows =
553 LOG(
INFO) <<
"Import fragment row count is " << foreign_table->maxFragRows
559 catalog.getDatabaseId(),
563 int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
564 if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
567 if (metadata_vector.empty()) {
571 for (
const auto& [key, _] : metadata_vector) {
585 maximum_num_fragments_buffered);
591 return import_status;
600 auto data_dir_path = boost::filesystem::canonical(base_path);
609 #
if ENABLE_IMPORT_PARQUET
613 throw std::runtime_error(
"Attempting to load S3 resource '" +
copy_from_source_ +
614 "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
615 #
if ENABLE_IMPORT_PARQUET
618 " or 'REGEX_PARSED_FILE'");
628 auto uuid = boost::uuids::random_generator()();
643 s3_archive->init_for_read();
645 const auto bucket_name = s3_archive->url_part(4);
647 auto object_keys = s3_archive->get_objkeys();
648 std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649 size_t object_count = 0;
650 for (
const auto& objkey : object_keys) {
651 auto&
object = objects_to_process[object_count++];
652 object.object_key = objkey;
653 object.is_downloaded =
false;
658 if (std::filesystem::exists(import_path)) {
659 std::filesystem::remove_all(import_path);
666 std::mutex communication_mutex;
667 bool continue_downloading =
true;
668 bool download_exception_occured =
false;
670 std::condition_variable files_download_condition;
672 auto is_downloading_finished = [&] {
674 return !continue_downloading || download_exception_occured;
677 std::function<void(const std::vector<size_t>&)> download_objects =
678 [&](
const std::vector<size_t>& partition) {
679 for (
const auto& index : partition) {
680 DownloadedObjectToProcess&
object = objects_to_process[index];
681 const std::string& obj_key =
object.object_key;
682 if (is_downloading_finished()) {
685 std::exception_ptr eptr;
686 std::string local_file_path;
687 std::string exception_what;
688 bool exception_occured =
false;
691 local_file_path = s3_archive->land(obj_key,
696 }
catch (
const std::exception& e) {
697 exception_what = e.what();
698 exception_occured =
true;
701 if (is_downloading_finished()) {
704 if (exception_occured) {
707 download_exception_occured =
true;
709 files_download_condition.notify_all();
710 throw std::runtime_error(
"failed to fetch s3 object: '" + obj_key +
711 "': " + exception_what);
714 object.download_file_path = local_file_path;
715 object.is_downloaded =
719 files_download_condition.notify_all();
723 std::function<void()> import_local_files = [&]() {
724 for (
size_t object_index = 0; object_index < object_count;) {
727 files_download_condition.wait(
729 [&download_exception_occured, object_index, &objects_to_process]() {
730 return objects_to_process[object_index].is_downloaded ||
731 download_exception_occured;
733 if (download_exception_occured) {
740 size_t end_object_index = object_count;
741 for (
size_t i = object_index + 1; i < object_count; ++i) {
742 if (!objects_to_process[i].is_downloaded) {
743 end_object_index = i;
749 std::string local_import_dir;
752 std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
753 copy_params_, objects_to_process, object_index, end_object_index);
754 local_import_status =
755 importGeneral(session_info, local_import_dir, local_copy_params);
757 std::filesystem::remove_all(local_import_dir);
758 }
catch (
const std::exception& except) {
761 std::string what = except.what();
763 for (
size_t i = object_index; i < end_object_index; ++i) {
764 auto&
object = objects_to_process[i];
765 what = boost::regex_replace(what,
766 boost::regex{
object.import_file_path},
767 bucket_name +
"/" +
object.object_key);
771 continue_downloading =
false;
774 std::filesystem::remove_all(local_import_dir);
775 throw std::runtime_error(what);
777 aggregate_import_status += local_import_status;
779 aggregate_import_status);
783 continue_downloading =
false;
795 std::vector<size_t> partition_range(object_count);
796 std::iota(partition_range.begin(), partition_range.end(), 0);
798 partition_range, num_download_threads, download_objects);
802 for (
auto& future : download_futures) {
808 for (
auto& future : download_futures) {
811 return aggregate_import_status;
814 throw std::runtime_error(
"AWS S3 support not available");
820 #ifdef ENABLE_IMPORT_PARQUET
833 catalog.getDatabaseId(),
843 foreign_table->validateOptionValues();
847 catalog.getDatabaseId(),
851 if (
auto parquet_import =
852 dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
858 if (copy_params_.threads == 0) {
859 max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
862 max_threads =
static_cast<size_t>(copy_params_.threads);
866 int num_importer_threads =
867 std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
868 parquet_import->setNumThreads(num_importer_threads);
869 int num_outer_thread = 1;
870 for (
int thread_count = 1; thread_count <= max_threads; ++thread_count) {
871 if (thread_count * num_importer_threads <= max_threads) {
872 num_outer_thread = thread_count;
877 ImportStatus import_status;
879 auto import_failed = [&import_status_mutex, &import_status] {
881 return import_status.load_failed;
884 std::vector<std::future<void>> futures;
886 for (
int ithread = 0; ithread < num_outer_thread; ++ithread) {
889 auto batch_result = parquet_import->getNextImportBatch();
890 if (import_failed()) {
893 auto batch = batch_result->getInsertData();
894 if (!batch || import_failed()) {
897 insert_data_loader.insertData(*session_info, *batch);
899 auto batch_import_status = batch_result->getImportStatus();
902 import_status.rows_completed += batch_import_status.rows_completed;
903 import_status.rows_rejected += batch_import_status.rows_rejected;
904 if (import_status.rows_rejected > copy_params_.max_reject) {
905 import_status.load_failed =
true;
906 import_status.load_msg =
907 "Load was cancelled due to max reject rows being reached";
915 for (
auto& future : futures) {
919 for (
auto& future : futures) {
923 if (import_status.load_failed) {
924 foreign_table.reset();
928 finalize(*session_info, import_status, parquet_import->getStringDictionaries());
930 return import_status;
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
static int32_t proxy_foreign_table_fragment_size_
std::vector< int > ChunkKey
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
static std::string default_import_path_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string copy_from_source_
class for a per-database catalog. also includes metadata for the current database and the current use...
std::string get_import_id(const import_export::CopyParams ©_params, const std::string ©_from_source)
bool is_s3_uri(const std::string &file_path)
bool g_enable_legacy_delimited_import
foreign_storage::ChunkToBufferMap fragment_buffers
#define CHUNK_KEY_FRAGMENT_IDX
void validate_sort_options(const FilePathOptions &options)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
size_t max_import_batch_row_count
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string ©_from_source, const import_export::CopyParams ©_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
bool geo_assign_render_groups
std::optional< std::string > regex_path_filter
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams ©_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
int getDatabaseId() const
ForeignDataImporter(const std::string &file_path, const CopyParams ©_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams ©_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams ©_params, const std::string ©_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void validate_regex_parser_options(const import_export::CopyParams ©_params)
bool g_enable_fsi_regex_import
Catalog & getCatalog() const
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
#define DEBUG_TIMER(name)
const TableDescriptor * table_
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams ©_params, const std::string ©_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams ©_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
#define CHUNK_KEY_COLUMN_IDX
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
const UserMetadata & get_currentUser() const
std::optional< std::string > file_sort_order_by
static constexpr char const * PARQUET
size_t g_max_import_threads
std::optional< std::string > file_sort_regex
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)