3598 auto& catalog = session->getCatalog();
3601 LocalQueryConnector local_connector;
3602 bool populate_table =
false;
3605 populate_table =
true;
3609 populate_table =
true;
3613 auto get_target_column_descriptors = [
this, &catalog](
const TableDescriptor* td) {
3614 std::vector<const ColumnDescriptor*> target_column_descriptors;
3616 auto list = catalog.getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
3617 target_column_descriptors = {std::begin(list), std::end(list)};
3621 if (cd ==
nullptr) {
3622 throw std::runtime_error(
"Column " + *c +
" does not exist.");
3624 target_column_descriptors.push_back(cd);
3628 return target_column_descriptors;
3633 if (validate_table) {
3636 throw std::runtime_error(
"Table " +
table_name_ +
" does not exist.");
3639 throw std::runtime_error(
"Insert to views is not supported yet.");
3645 throw std::runtime_error(
"User has no insert privileges on " +
table_name_ +
".");
3651 auto source_column_descriptors = local_connector.getColumnDescriptors(
result,
false);
3653 std::vector<const ColumnDescriptor*> target_column_descriptors =
3654 get_target_column_descriptors(td);
3656 if (source_column_descriptors.size() != target_column_descriptors.size()) {
3657 throw std::runtime_error(
"The number of source and target columns does not match.");
3660 for (
int i = 0; i < source_column_descriptors.size(); i++) {
3662 &(*std::next(source_column_descriptors.begin(), i));
3666 auto type_cannot_be_cast = [](
const auto& col_type) {
3667 return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
3668 col_type.is_boolean());
3671 if (type_cannot_be_cast(source_cd->
columnType) ||
3672 type_cannot_be_cast(target_cd->
columnType)) {
3673 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3675 "' and target '" + target_cd->
columnName +
" " +
3677 "' column types do not match.");
3682 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3684 "' and target '" + target_cd->
columnName +
" " +
3686 "' array column element types do not match.");
3701 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3703 "' and target '" + target_cd->
columnName +
" " +
3705 "' decimal columns scales do not match.");
3711 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3713 "' and target '" + target_cd->
columnName +
" " +
3715 "' column types do not match.");
3719 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3721 "' and target '" + target_cd->
columnName +
" " +
3723 "' columns string encodings do not match.");
3730 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3732 "' and target '" + target_cd->
columnName +
" " +
3734 "' timestamp column precisions do not match.");
3743 throw std::runtime_error(
"Source '" + source_cd->
columnName +
" " +
3745 "' and target '" + target_cd->
columnName +
" " +
3747 "' column encoding sizes do not match.");
3752 if (!populate_table) {
3756 int64_t total_row_count = 0;
3757 int64_t total_source_query_time_ms = 0;
3758 int64_t total_target_value_translate_time_ms = 0;
3759 int64_t total_data_load_time_ms = 0;
3762 auto target_column_descriptors = get_target_column_descriptors(td);
3763 auto outer_frag_count =
3766 size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
3767 auto query_session = session ? session->get_session_id() :
"";
3769 std::string work_type_str = for_CTAS ?
"CTAS" :
"ITAS";
3771 for (
size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
3772 std::vector<size_t> allowed_outer_fragment_indices;
3774 if (outer_frag_count) {
3775 allowed_outer_fragment_indices.push_back(outer_frag_idx);
3779 std::vector<AggregatedResult> query_results =
3782 allowed_outer_fragment_indices,
3784 total_source_query_time_ms +=
timer_stop(query_clock_begin);
3787 auto query_str =
"INSERT_DATA for " + work_type_str;
3792 executor->enrollQuerySession(query_session,
3796 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3803 executor->clearQuerySessionStatus(query_session,
start_time);
3807 for (
auto&
res : query_results) {
3809 throw std::runtime_error(
3810 "Query execution has been interrupted while performing " + work_type_str);
3812 auto& result_rows =
res.rs;
3814 const auto num_rows = result_rows->rowCount();
3816 if (0 == num_rows) {
3820 total_row_count += num_rows;
3825 const size_t rows_per_block =
3826 std::max(std::min(num_rows / leaf_count,
size_t(64 * 1024)),
size_t(1));
3828 std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
3832 const int num_worker_threads = std::thread::hardware_concurrency();
3834 std::vector<size_t> thread_start_idx(num_worker_threads),
3835 thread_end_idx(num_worker_threads);
3836 bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
3838 std::atomic<size_t> crt_row_idx{0};
3840 auto do_work = [&result_rows, &value_converters, &crt_row_idx](
3842 const size_t block_end,
3843 const size_t num_cols,
3845 bool& stop_convert) {
3846 const auto result_row = result_rows->getRowAtNoTranslations(idx);
3847 if (!result_row.empty()) {
3848 size_t target_row = crt_row_idx.fetch_add(1);
3849 if (target_row >= block_end) {
3850 stop_convert =
true;
3853 for (
unsigned int col = 0; col < num_cols; col++) {
3854 const auto& mapd_variant = result_row[col];
3855 value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3860 auto convert_function = [&thread_start_idx,
3866 &do_work](
const int thread_id,
const size_t block_end) {
3867 const int num_cols = value_converters.size();
3868 const size_t start = thread_start_idx[
thread_id];
3869 const size_t end = thread_end_idx[
thread_id];
3871 bool stop_convert =
false;
3873 size_t local_idx = 0;
3874 for (idx = start; idx < end; ++idx, ++local_idx) {
3875 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
3877 throw std::runtime_error(
3878 "Query execution has been interrupted while performing " +
3881 do_work(idx, block_end, num_cols, thread_id, stop_convert);
3887 for (idx = start; idx < end; ++idx) {
3888 do_work(idx, block_end, num_cols, thread_id, stop_convert);
3897 auto single_threaded_value_converter =
3898 [&crt_row_idx, &value_converters, &result_rows](
const size_t idx,
3899 const size_t block_end,
3900 const size_t num_cols,
3901 bool& stop_convert) {
3902 size_t target_row = crt_row_idx.fetch_add(1);
3903 if (target_row >= block_end) {
3904 stop_convert =
true;
3907 const auto result_row = result_rows->getNextRow(
false,
false);
3908 CHECK(!result_row.empty());
3909 for (
unsigned int col = 0; col < num_cols; col++) {
3910 const auto& mapd_variant = result_row[col];
3911 value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3915 auto single_threaded_convert_function = [&value_converters,
3921 &single_threaded_value_converter](
3923 const size_t block_end) {
3924 const int num_cols = value_converters.size();
3925 const size_t start = thread_start_idx[
thread_id];
3926 const size_t end = thread_end_idx[
thread_id];
3928 bool stop_convert =
false;
3930 size_t local_idx = 0;
3931 for (idx = start; idx < end; ++idx, ++local_idx) {
3932 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
3934 throw std::runtime_error(
3935 "Query execution has been interrupted while performing " +
3938 single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
3944 for (idx = start; idx < end; ++idx) {
3945 single_threaded_value_converter(idx, end, num_cols, stop_convert);
3954 if (can_go_parallel) {
3955 const size_t entry_count = result_rows->entryCount();
3959 stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
3960 i < num_worker_threads && start_entry < entry_count;
3961 ++i, start_entry += stride) {
3962 const auto end_entry = std::min(start_entry + stride, entry_count);
3963 thread_start_idx[i] = start_entry;
3964 thread_end_idx[i] = end_entry;
3967 thread_start_idx[0] = 0;
3968 thread_end_idx[0] = result_rows->entryCount();
3973 for (
size_t block_start = 0; block_start < num_rows;
3974 block_start += rows_per_block) {
3975 const auto num_rows_this_itr = block_start + rows_per_block < num_rows
3977 : num_rows - block_start;
3979 value_converters.clear();
3981 for (
const auto targetDescriptor : target_column_descriptors) {
3982 auto sourceDataMetaInfo =
res.targets_meta[colNum++];
3988 targetDescriptor->columnType,
3989 !targetDescriptor->columnType.get_notnull(),
3990 result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
3992 sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
3993 ? executor->getStringDictionaryProxy(
3994 sourceDataMetaInfo.get_type_info().get_comp_param(),
3995 result_rows->getRowSetMemOwner(),
3998 IS_GEO_POLY(targetDescriptor->columnType.get_type()) &&
4000 ? &render_group_analyzer_map
4002 auto converter = factory.
create(param);
4003 value_converters.push_back(std::move(converter));
4007 if (can_go_parallel) {
4008 std::vector<std::future<void>> worker_threads;
4009 for (
int i = 0; i < num_worker_threads; ++i) {
4010 worker_threads.push_back(
4014 for (
auto& child : worker_threads) {
4017 for (
auto& child : worker_threads) {
4022 single_threaded_convert_function(0, num_rows_this_itr);
4026 auto finalizer_func =
4027 [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
4028 targetValueConverter->finalizeDataBlocksForInsertData();
4031 std::vector<std::future<void>> worker_threads;
4032 for (
auto& converterPtr : value_converters) {
4033 worker_threads.push_back(
4037 for (
auto& child : worker_threads) {
4040 for (
auto& child : worker_threads) {
4045 insert_data.
databaseId = catalog.getCurrentDB().dbId;
4048 insert_data.
numRows = num_rows_this_itr;
4050 for (
int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
4053 throw std::runtime_error(
4054 "Query execution has been interrupted while performing " +
4057 value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
4059 total_target_value_translate_time_ms +=
timer_stop(translate_clock_begin);
4062 auto data_memory_holder =
4064 insertDataLoader.insertData(*session, insert_data);
4065 total_data_load_time_ms +=
timer_stop(data_load_clock_begin);
4072 }
catch (std::exception& e) {
4073 LOG(
ERROR) <<
"An error occurred during ITAS rollback attempt. Table id: "
4074 << td->
tableId <<
", Error: " << e.what();
4079 int64_t total_time_ms = total_source_query_time_ms +
4080 total_target_value_translate_time_ms + total_data_load_time_ms;
4082 VLOG(1) <<
"CTAS/ITAS " << total_row_count <<
" rows loaded in " << total_time_ms
4083 <<
"ms (outer_frag_count=" << outer_frag_count
4084 <<
", query_time=" << total_source_query_time_ms
4085 <<
"ms, translation_time=" << total_target_value_translate_time_ms
4086 <<
"ms, data_load_time=" << total_data_load_time_ms
4089 if (!is_temporary) {
void validate_non_foreign_table_write(const TableDescriptor *table_descriptor)
HOST DEVICE SQLTypes get_subtype() const
HOST DEVICE int get_size() const
bool is_timestamp() const
HOST DEVICE int get_scale() const
static const AccessPrivileges INSERT_INTO_TABLE
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
TypeR::rep timer_stop(Type clock_begin)
virtual size_t leafCount()=0
bool g_enable_non_kernel_time_query_interrupt
HOST DEVICE SQLTypes get_type() const
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
size_t numRows
a vector of column ids for the row(s) being inserted
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
QueryState & getQueryState()
specifies the content in-memory of a row in the column metadata table
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
QueryConnector * leafs_connector_
HOST DEVICE EncodingType get_compression() const
bool table_is_temporary(const TableDescriptor *const td)
HOST DEVICE int get_dimension() const
std::string select_query_
std::string get_type_name() const
virtual size_t getOuterFragmentCount(QueryStateProxy, std::string &sql_query_string)=0
std::vector< std::unique_ptr< std::string > > column_list_
virtual std::vector< AggregatedResult > query(QueryStateProxy, std::string &sql_query_string, std::vector< size_t > outer_frag_indices, bool allow_interrupt)=0
The data to be inserted using the fragment manager.
std::map< int, import_export::RenderGroupAnalyzer > RenderGroupAnalyzerMap
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
SQLTypeInfo get_elem_type() const
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
const std::string getQuerySubmittedTime() const
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
static const ExecutorId UNITARY_EXECUTOR_ID