19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
45 std::string data_wrapper_type;
50 #ifdef ENABLE_IMPORT_PARQUET
57 return data_wrapper_type;
67 metadata_scan_exception) {
72 if (min_feasible_fragment_size < 0) {
75 foreign_table->
maxFragRows = min_feasible_fragment_size;
81 return metadata_vector;
98 const std::string& copy_from_source) {
99 int32_t max_fragment_id = -1;
100 for (
const auto& [key, _] : metadata_vector) {
115 for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
117 std::set<ChunkKey> fragment_keys;
118 for (
const auto& [key, _] : metadata_vector) {
120 fragment_keys.insert(key);
125 if (col_desc->columnType.is_varlen_indeed()) {
128 auto index_key = key;
130 fragment_keys.insert(index_key);
137 std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
138 fragment_buffers_owner;
140 auto delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
141 for (
const auto& key : fragment_keys) {
142 fragment_buffers_owner[key] =
143 std::make_unique<foreign_storage::ForeignStorageBuffer>();
144 fragment_buffers_owner[key]->resetToEmpty();
156 for (
const auto& [key, buffer] : fragment_buffers) {
161 if (col_desc->columnType.is_varlen_indeed()) {
164 auto index_key = key;
170 insert_chunks.chunks[col_id] =
176 auto row_count = fragment_buffers.begin()
177 ->second->getEncoder()
180 insert_chunks.valid_row_indices.reserve(row_count);
181 for (
size_t irow = 0; irow < row_count; ++irow) {
182 if (delete_buffer->size() > 0) {
183 CHECK_LE(irow, delete_buffer->size());
184 if (delete_buffer->getMemoryPtr()[irow]) {
188 insert_chunks.valid_row_indices.emplace_back(irow);
192 insert_data_loader.
insertChunks(*session_info, insert_chunks);
194 CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
195 import_status.
rows_rejected += row_count - insert_chunks.valid_row_indices.size();
196 import_status.
rows_completed += insert_chunks.valid_row_indices.size();
199 import_status.
load_msg =
"Load was cancelled due to max reject rows being reached";
205 return import_status;
209 struct DownloadedObjectToProcess {
210 std::string object_key;
211 std::atomic<bool> is_downloaded;
212 std::string download_file_path;
213 std::string import_file_path;
216 size_t get_number_of_digits(
const size_t number) {
220 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
222 std::vector<DownloadedObjectToProcess>& objects_to_process,
223 const size_t begin_object_index,
224 const size_t end_object_index) {
238 CHECK_GT(end_object_index, begin_object_index);
239 CHECK_LT(begin_object_index, objects_to_process.size());
241 size_t num_objects = end_object_index - begin_object_index;
242 auto& first_object = objects_to_process[begin_object_index];
243 std::string first_path = first_object.download_file_path;
244 std::string temp_dir = first_path +
"_import";
246 if (!std::filesystem::create_directory(temp_dir)) {
247 throw std::runtime_error(
"failed to create temporary directory for import: " +
263 std::filesystem::path temp_dir_path{temp_dir};
265 size_t num_zero = get_number_of_digits(num_objects);
266 for (
size_t i = begin_object_index; i < end_object_index; ++i) {
267 auto&
object = objects_to_process[i];
268 std::filesystem::path old_path =
object.download_file_path;
270 auto zero_padded_counter_str =
271 std::string(num_zero - counter_str.length(),
'0') + counter_str;
272 auto new_path = (temp_dir_path / zero_padded_counter_str).
string();
273 std::filesystem::rename(old_path, new_path);
274 object.import_file_path = new_path;
276 return {temp_dir, local_copy_params};
282 namespace import_export {
287 : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
288 connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
294 const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
295 string_dictionaries) {
300 auto timer =
DEBUG_TIMER(
"Dictionary Checkpointing");
301 for (
const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
302 if (!string_dictionary->checkpoint()) {
303 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
304 << column_desciptor->columnName <<
" failed.";
306 import_status.
load_msg =
"Dictionary checkpoint failed";
322 const int32_t table_id) {
323 auto& catalog = parent_session_info.
getCatalog();
325 auto logical_columns =
328 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
329 for (
const auto& column_descriptor : logical_columns) {
330 if (!column_descriptor->columnType.is_dict_encoded_string()) {
333 auto dict_descriptor =
334 catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(),
true);
335 string_dictionaries.push_back({column_descriptor, dict_descriptor->stringDict.get()});
338 finalize(parent_session_info, import_status, string_dictionaries);
348 const std::string& copy_from_source,
360 auto [server, user_mapping, foreign_table] =
363 catalog.getDatabaseId(),
373 catalog.getDatabaseId(),
379 if (metadata_vector.empty()) {
396 return import_status;
405 auto data_dir_path = boost::filesystem::canonical(base_path);
414 #
if ENABLE_IMPORT_PARQUET
418 throw std::runtime_error(
"Attempting to load S3 resource '" +
copy_from_source_ +
419 "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
420 #
if ENABLE_IMPORT_PARQUET
423 " or 'REGEX_PARSED_FILE'");
431 auto uuid = boost::uuids::random_generator()();
446 s3_archive->init_for_read();
448 const auto bucket_name = s3_archive->url_part(4);
450 auto object_keys = s3_archive->get_objkeys();
451 std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
452 size_t object_count = 0;
453 for (
const auto& objkey : object_keys) {
454 auto&
object = objects_to_process[object_count++];
455 object.object_key = objkey;
456 object.is_downloaded =
false;
461 if (std::filesystem::exists(import_path)) {
462 std::filesystem::remove_all(import_path);
469 std::mutex communication_mutex;
470 bool continue_downloading =
true;
471 bool download_exception_occured =
false;
473 std::condition_variable files_download_condition;
475 auto is_downloading_finished = [&] {
476 std::unique_lock communication_lock(communication_mutex);
477 return !continue_downloading || download_exception_occured;
480 std::function<void(const std::vector<size_t>&)> download_objects =
481 [&](
const std::vector<size_t>& partition) {
482 for (
const auto& index : partition) {
483 DownloadedObjectToProcess&
object = objects_to_process[index];
484 const std::string& obj_key =
object.object_key;
485 if (is_downloading_finished()) {
488 std::exception_ptr eptr;
489 std::string local_file_path;
490 std::string exception_what;
491 bool exception_occured =
false;
494 local_file_path = s3_archive->land(obj_key,
499 }
catch (
const std::exception& e) {
500 exception_what = e.what();
501 exception_occured =
true;
504 if (is_downloading_finished()) {
507 if (exception_occured) {
509 std::unique_lock communication_lock(communication_mutex);
510 download_exception_occured =
true;
512 files_download_condition.notify_all();
513 throw std::runtime_error(
"failed to fetch s3 object: '" + obj_key +
514 "': " + exception_what);
517 object.download_file_path = local_file_path;
518 object.is_downloaded =
522 files_download_condition.notify_all();
526 std::function<void()> import_local_files = [&]() {
527 for (
size_t object_index = 0; object_index < object_count;) {
529 std::unique_lock communication_lock(communication_mutex);
530 files_download_condition.wait(
532 [&download_exception_occured, object_index, &objects_to_process]() {
533 return objects_to_process[object_index].is_downloaded ||
534 download_exception_occured;
536 if (download_exception_occured) {
543 size_t end_object_index = object_count;
544 for (
size_t i = object_index + 1; i < object_count; ++i) {
545 if (!objects_to_process[i].is_downloaded) {
546 end_object_index = i;
552 std::string local_import_dir;
555 std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
556 copy_params_, objects_to_process, object_index, end_object_index);
557 local_import_status =
558 importGeneral(session_info, local_import_dir, local_copy_params);
560 std::filesystem::remove_all(local_import_dir);
561 }
catch (
const std::exception& except) {
564 std::string what = except.what();
566 for (
size_t i = object_index; i < end_object_index; ++i) {
567 auto&
object = objects_to_process[i];
568 what = boost::regex_replace(what,
569 boost::regex{
object.import_file_path},
570 bucket_name +
"/" +
object.object_key);
573 std::unique_lock communication_lock(communication_mutex);
574 continue_downloading =
false;
577 std::filesystem::remove_all(local_import_dir);
578 throw std::runtime_error(what);
580 aggregate_import_status += local_import_status;
582 aggregate_import_status);
585 std::unique_lock communication_lock(communication_mutex);
586 continue_downloading =
false;
598 std::vector<size_t> partition_range(object_count);
599 std::iota(partition_range.begin(), partition_range.end(), 0);
601 partition_range, num_download_threads, download_objects);
605 for (
auto& future : download_futures) {
611 for (
auto& future : download_futures) {
614 return aggregate_import_status;
617 throw std::runtime_error(
"AWS S3 support not available");
623 #ifdef ENABLE_IMPORT_PARQUET
636 catalog.getDatabaseId(),
646 foreign_table->validateOptionValues();
650 catalog.getDatabaseId(),
654 if (
auto parquet_import =
655 dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
661 if (copy_params_.threads == 0) {
662 max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
665 max_threads =
static_cast<size_t>(copy_params_.threads);
669 int num_importer_threads =
670 std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
671 parquet_import->setNumThreads(num_importer_threads);
672 int num_outer_thread = 1;
673 for (
int thread_count = 1; thread_count <= max_threads; ++thread_count) {
674 if (thread_count * num_importer_threads <= max_threads) {
675 num_outer_thread = thread_count;
679 std::shared_mutex import_status_mutex;
680 ImportStatus import_status;
682 auto import_failed = [&import_status_mutex, &import_status] {
683 std::shared_lock import_status_lock(import_status_mutex);
684 return import_status.load_failed;
687 std::vector<std::future<void>> futures;
689 for (
int ithread = 0; ithread < num_outer_thread; ++ithread) {
692 auto batch_result = parquet_import->getNextImportBatch();
693 if (import_failed()) {
696 auto batch = batch_result->getInsertData();
697 if (!batch || import_failed()) {
700 insert_data_loader.insertData(*session_info, *batch);
702 auto batch_import_status = batch_result->getImportStatus();
704 std::unique_lock import_status_lock(import_status_mutex);
705 import_status.rows_completed += batch_import_status.rows_completed;
706 import_status.rows_rejected += batch_import_status.rows_rejected;
707 if (import_status.rows_rejected > copy_params_.max_reject) {
708 import_status.load_failed =
true;
709 import_status.load_msg =
710 "Load was cancelled due to max reject rows being reached";
718 for (
auto& future : futures) {
722 for (
auto& future : futures) {
726 if (import_status.load_failed) {
727 foreign_table.reset();
731 finalize(*session_info, import_status, parquet_import->getStringDictionaries());
733 return import_status;
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
static int32_t proxy_foreign_table_fragment_size_
static constexpr char const * REGEX_PARSER
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
static std::string default_import_path_
std::string copy_from_source_
class for a per-database catalog. also includes metadata for the current database and the current use...
void validate_sort_options(const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
bool g_enable_legacy_delimited_import
#define CHUNK_KEY_FRAGMENT_IDX
import_export::ImportStatus import_foreign_data(const ChunkMetadataVector &metadata_vector, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams ©_params, const std::string ©_from_source)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string ©_from_source, const import_export::CopyParams ©_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
bool geo_assign_render_groups
std::optional< std::string > regex_path_filter
const std::string kDefaultImportDirName
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
bool g_enable_assign_render_groups
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
int32_t s3_max_concurrent_downloads
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
int getDatabaseId() const
ForeignDataImporter(const std::string &file_path, const CopyParams ©_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams ©_params)
Global bool for controlling render group assignment, remove along with legacy poly rendering...
std::string get_data_wrapper_type(const import_export::CopyParams ©_params)
bool is_s3_uri(const std::string &file_path)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void validate_regex_parser_options(const import_export::CopyParams ©_params)
bool g_enable_fsi_regex_import
Catalog & getCatalog() const
DEVICE void iota(ARGS &&...args)
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
#define DEBUG_TIMER(name)
static constexpr char const * CSV
const TableDescriptor * table_
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
void validate_copy_params(const import_export::CopyParams ©_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
#define CHUNK_KEY_COLUMN_IDX
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
const UserMetadata & get_currentUser() const
std::optional< std::string > file_sort_order_by
static constexpr char const * PARQUET
size_t g_max_import_threads
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::optional< std::string > file_sort_regex
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)