OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Public Member Functions

 InsertIntoTableAsSelectStmt (const rapidjson::Value &payload)
 
 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

QueryConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1126 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const rapidjson::Value &  payload)

Definition at line 3573 of file ParserNode.cpp.

References CHECK, column_list_, json_str(), select_query_, and table_name_.

3574  {
3575  CHECK(payload.HasMember("name"));
3576  table_name_ = json_str(payload["name"]);
3577 
3578  CHECK(payload.HasMember("query"));
3579  select_query_ = json_str(payload["query"]);
3580 
3581  boost::replace_all(select_query_, "\n", " ");
3582  select_query_ = "(" + select_query_ + ")";
3583 
3584  if (payload.HasMember("columns")) {
3585  CHECK(payload["columns"].IsArray());
3586  for (auto& column : payload["columns"].GetArray()) {
3587  std::string s = json_str(column);
3588  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
3589  }
3590  }
3591 }
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1159
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1130 of file ParserNode.h.

References column_list_.

1133  : table_name_(*table_name), select_query_(*select_query) {
1134  if (c) {
1135  for (auto e : *c) {
1136  column_list_.emplace_back(e);
1137  }
1138  delete c;
1139  }
1140 
1141  delete table_name;
1142  delete select_query;
1143  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1159

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 4152 of file ParserNode.cpp.

References Parser::anonymous_namespace{ParserNode.cpp}::acquire_query_table_locks(), Executor::clearExternalCaches(), query_state::QueryState::create(), Catalog_Namespace::DBMetadata::dbId, legacylockmgr::ExecutorOuterLock, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForTable(), legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), populateData(), select_query_, STDLOG, and table_name_.

Referenced by heavydb.cursor.Cursor::executemany().

4153  {
4154  if (read_only_mode) {
4155  throw std::runtime_error("INSERT INTO TABLE invalid in read only mode.");
4156  }
4157  auto session_copy = session;
4158  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4159  &session_copy, boost::null_deleter());
4160  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4161  auto stdlog = STDLOG(query_state);
4162  auto& catalog = session_ptr->getCatalog();
4163 
4164  const auto execute_read_lock =
4168 
4169  if (catalog.getMetadataForTable(table_name_) == nullptr) {
4170  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
4171  }
4172 
4173  auto locks = acquire_query_table_locks(
4174  catalog, select_query_, query_state->createQueryStateProxy(), table_name_);
4175  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4176 
4177  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
4178 
4179  try {
4180  populateData(query_state->createQueryStateProxy(), td, true, false);
4181  } catch (...) {
4182  throw;
4183  }
4184 }
static std::shared_ptr< WrapperType< MutexType > > getMutex(const LockType lockType, const KeyType &key)
void populateData(QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:145
std::shared_lock< T > shared_lock
lockmgr::LockedTableDescriptors acquire_query_table_locks(Catalog_Namespace::Catalog &catalog, const std::string &query_str, const QueryStateProxy &query_state_proxy, const std::optional< std::string > &insert_table_name={})
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:391
#define STDLOG(...)
Definition: QueryState.h:235

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1154 of file ParserNode.h.

References select_query_.

1154 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1152 of file ParserNode.h.

References table_name_.

1152 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
const TableDescriptor td,
bool  validate_table,
bool  for_CTAS = false 
)

Definition at line 3593 of file ParserNode.cpp.

References threading_serial::async(), CHECK, Parser::check_session_interrupted(), Fragmenter_Namespace::InsertDataLoader::InsertConnector::checkpoint(), column_list_, ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, logger::ERROR, import_export::fill_missing_columns(), g_cluster, g_enable_assign_render_groups, g_enable_non_kernel_time_query_interrupt, g_enable_string_functions, ResultSet::GeoTargetValue, SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), Parser::LocalQueryConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), Parser::QueryConnector::getOuterFragmentCount(), query_state::QueryStateProxy::getQueryState(), query_state::QueryState::getQuerySubmittedTime(), AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfo::is_array(), SQLTypeInfo::is_date(), SQLTypeInfo::is_decimal(), IS_GEO_POLY, SQLTypeInfo::is_geometry(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), SQLTypeInfo::is_timestamp(), TableDescriptor::isView, Fragmenter_Namespace::InsertDataLoader::InsertConnector::leafCount(), leafs_connector_, LOG, Fragmenter_Namespace::InsertData::numRows, Parser::QueryConnector::query(), Parser::LocalQueryConnector::query(), run_benchmark_import::res, run_benchmark_import::result, Fragmenter_Namespace::InsertDataLoader::InsertConnector::rollback(), select_query_, run_benchmark_import::start_time, table_is_temporary(), table_name_, TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, logger::thread_id(), timer_start(), timer_stop(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, foreign_storage::validate_non_foreign_table_write(), and VLOG.

Referenced by execute(), and Parser::CreateTableAsSelectStmt::execute().

3596  {
3597  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
3598  auto& catalog = session->getCatalog();
3600 
3601  LocalQueryConnector local_connector;
3602  bool populate_table = false;
3603 
3604  if (leafs_connector_) {
3605  populate_table = true;
3606  } else {
3607  leafs_connector_ = &local_connector;
3608  if (!g_cluster) {
3609  populate_table = true;
3610  }
3611  }
3612 
3613  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
3614  std::vector<const ColumnDescriptor*> target_column_descriptors;
3615  if (column_list_.empty()) {
3616  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
3617  target_column_descriptors = {std::begin(list), std::end(list)};
3618  } else {
3619  for (auto& c : column_list_) {
3620  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
3621  if (cd == nullptr) {
3622  throw std::runtime_error("Column " + *c + " does not exist.");
3623  }
3624  target_column_descriptors.push_back(cd);
3625  }
3626  }
3627 
3628  return target_column_descriptors;
3629  };
3630 
3631  bool is_temporary = table_is_temporary(td);
3632 
3633  if (validate_table) {
3634  // check access privileges
3635  if (!td) {
3636  throw std::runtime_error("Table " + table_name_ + " does not exist.");
3637  }
3638  if (td->isView) {
3639  throw std::runtime_error("Insert to views is not supported yet.");
3640  }
3641 
3642  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
3644  table_name_)) {
3645  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
3646  }
3647 
3648  // only validate the select query so we get the target types
3649  // correctly, but do not populate the result set
3650  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
3651  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
3652 
3653  std::vector<const ColumnDescriptor*> target_column_descriptors =
3654  get_target_column_descriptors(td);
3655 
3656  if (source_column_descriptors.size() != target_column_descriptors.size()) {
3657  throw std::runtime_error("The number of source and target columns does not match.");
3658  }
3659 
3660  for (int i = 0; i < source_column_descriptors.size(); i++) {
3661  const ColumnDescriptor* source_cd =
3662  &(*std::next(source_column_descriptors.begin(), i));
3663  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
3664 
3665  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
3666  auto type_cannot_be_cast = [](const auto& col_type) {
3667  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
3668  col_type.is_boolean());
3669  };
3670 
3671  if (type_cannot_be_cast(source_cd->columnType) ||
3672  type_cannot_be_cast(target_cd->columnType)) {
3673  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3674  source_cd->columnType.get_type_name() +
3675  "' and target '" + target_cd->columnName + " " +
3676  target_cd->columnType.get_type_name() +
3677  "' column types do not match.");
3678  }
3679  }
3680  if (source_cd->columnType.is_array()) {
3681  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
3682  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3683  source_cd->columnType.get_type_name() +
3684  "' and target '" + target_cd->columnName + " " +
3685  target_cd->columnType.get_type_name() +
3686  "' array column element types do not match.");
3687  }
3688  }
3689 
3690  if (source_cd->columnType.is_decimal() ||
3691  source_cd->columnType.get_elem_type().is_decimal()) {
3692  SQLTypeInfo sourceType = source_cd->columnType;
3693  SQLTypeInfo targetType = target_cd->columnType;
3694 
3695  if (source_cd->columnType.is_array()) {
3696  sourceType = source_cd->columnType.get_elem_type();
3697  targetType = target_cd->columnType.get_elem_type();
3698  }
3699 
3700  if (sourceType.get_scale() != targetType.get_scale()) {
3701  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3702  source_cd->columnType.get_type_name() +
3703  "' and target '" + target_cd->columnName + " " +
3704  target_cd->columnType.get_type_name() +
3705  "' decimal columns scales do not match.");
3706  }
3707  }
3708 
3709  if (source_cd->columnType.is_string()) {
3710  if (!target_cd->columnType.is_string()) {
3711  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3712  source_cd->columnType.get_type_name() +
3713  "' and target '" + target_cd->columnName + " " +
3714  target_cd->columnType.get_type_name() +
3715  "' column types do not match.");
3716  }
3717  if (source_cd->columnType.get_compression() !=
3718  target_cd->columnType.get_compression()) {
3719  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3720  source_cd->columnType.get_type_name() +
3721  "' and target '" + target_cd->columnName + " " +
3722  target_cd->columnType.get_type_name() +
3723  "' columns string encodings do not match.");
3724  }
3725  }
3726 
3727  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
3728  if (source_cd->columnType.get_dimension() !=
3729  target_cd->columnType.get_dimension()) {
3730  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3731  source_cd->columnType.get_type_name() +
3732  "' and target '" + target_cd->columnName + " " +
3733  target_cd->columnType.get_type_name() +
3734  "' timestamp column precisions do not match.");
3735  }
3736  }
3737 
3738  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
3739  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
3740  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
3741  !source_cd->columnType.is_timestamp() &&
3742  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
3743  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3744  source_cd->columnType.get_type_name() +
3745  "' and target '" + target_cd->columnName + " " +
3746  target_cd->columnType.get_type_name() +
3747  "' column encoding sizes do not match.");
3748  }
3749  }
3750  }
3751 
3752  if (!populate_table) {
3753  return;
3754  }
3755 
3756  int64_t total_row_count = 0;
3757  int64_t total_source_query_time_ms = 0;
3758  int64_t total_target_value_translate_time_ms = 0;
3759  int64_t total_data_load_time_ms = 0;
3760 
3762  auto target_column_descriptors = get_target_column_descriptors(td);
3763  auto outer_frag_count =
3765 
3766  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
3767  auto query_session = session ? session->get_session_id() : "";
3769  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
3770  try {
3771  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
3772  std::vector<size_t> allowed_outer_fragment_indices;
3773 
3774  if (outer_frag_count) {
3775  allowed_outer_fragment_indices.push_back(outer_frag_idx);
3776  }
3777 
3778  const auto query_clock_begin = timer_start();
3779  std::vector<AggregatedResult> query_results =
3780  leafs_connector_->query(query_state_proxy,
3781  select_query_,
3782  allowed_outer_fragment_indices,
3784  total_source_query_time_ms += timer_stop(query_clock_begin);
3785 
3786  auto start_time = query_state_proxy.getQueryState().getQuerySubmittedTime();
3787  auto query_str = "INSERT_DATA for " + work_type_str;
3789  // In the clean-up phase of the query execution for collecting aggregated result
3790  // of SELECT query, we remove its query session info, so we need to enroll the
3791  // session info again
3792  executor->enrollQuerySession(query_session,
3793  query_str,
3794  start_time,
3796  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3797  }
3798 
3799  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
3800  // this data population is non-kernel operation, so we manually cleanup
3801  // the query session info in the cleanup phase
3803  executor->clearQuerySessionStatus(query_session, start_time);
3804  }
3805  };
3806 
3807  for (auto& res : query_results) {
3808  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3809  throw std::runtime_error(
3810  "Query execution has been interrupted while performing " + work_type_str);
3811  }
3812  auto& result_rows = res.rs;
3813  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
3814  const auto num_rows = result_rows->rowCount();
3815 
3816  if (0 == num_rows) {
3817  continue;
3818  }
3819 
3820  total_row_count += num_rows;
3821 
3822  size_t leaf_count = leafs_connector_->leafCount();
3823 
3824  // ensure that at least 1 row is processed per block up to a maximum of 65536 rows
3825  const size_t rows_per_block =
3826  std::max(std::min(num_rows / leaf_count, size_t(64 * 1024)), size_t(1));
3827 
3828  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
3829 
3831 
3832  const int num_worker_threads = std::thread::hardware_concurrency();
3833 
3834  std::vector<size_t> thread_start_idx(num_worker_threads),
3835  thread_end_idx(num_worker_threads);
3836  bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
3837 
3838  std::atomic<size_t> crt_row_idx{0};
3839 
3840  auto do_work = [&result_rows, &value_converters, &crt_row_idx](
3841  const size_t idx,
3842  const size_t block_end,
3843  const size_t num_cols,
3844  const size_t thread_id,
3845  bool& stop_convert) {
3846  const auto result_row = result_rows->getRowAtNoTranslations(idx);
3847  if (!result_row.empty()) {
3848  size_t target_row = crt_row_idx.fetch_add(1);
3849  if (target_row >= block_end) {
3850  stop_convert = true;
3851  return;
3852  }
3853  for (unsigned int col = 0; col < num_cols; col++) {
3854  const auto& mapd_variant = result_row[col];
3855  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3856  }
3857  }
3858  };
3859 
3860  auto convert_function = [&thread_start_idx,
3861  &thread_end_idx,
3862  &value_converters,
3863  &executor,
3864  &query_session,
3865  &work_type_str,
3866  &do_work](const int thread_id, const size_t block_end) {
3867  const int num_cols = value_converters.size();
3868  const size_t start = thread_start_idx[thread_id];
3869  const size_t end = thread_end_idx[thread_id];
3870  size_t idx = 0;
3871  bool stop_convert = false;
3873  size_t local_idx = 0;
3874  for (idx = start; idx < end; ++idx, ++local_idx) {
3875  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3876  check_session_interrupted(query_session, executor))) {
3877  throw std::runtime_error(
3878  "Query execution has been interrupted while performing " +
3879  work_type_str);
3880  }
3881  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3882  if (stop_convert) {
3883  break;
3884  }
3885  }
3886  } else {
3887  for (idx = start; idx < end; ++idx) {
3888  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3889  if (stop_convert) {
3890  break;
3891  }
3892  }
3893  }
3894  thread_start_idx[thread_id] = idx;
3895  };
3896 
3897  auto single_threaded_value_converter =
3898  [&crt_row_idx, &value_converters, &result_rows](const size_t idx,
3899  const size_t block_end,
3900  const size_t num_cols,
3901  bool& stop_convert) {
3902  size_t target_row = crt_row_idx.fetch_add(1);
3903  if (target_row >= block_end) {
3904  stop_convert = true;
3905  return;
3906  }
3907  const auto result_row = result_rows->getNextRow(false, false);
3908  CHECK(!result_row.empty());
3909  for (unsigned int col = 0; col < num_cols; col++) {
3910  const auto& mapd_variant = result_row[col];
3911  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3912  }
3913  };
3914 
3915  auto single_threaded_convert_function = [&value_converters,
3916  &thread_start_idx,
3917  &thread_end_idx,
3918  &executor,
3919  &query_session,
3920  &work_type_str,
3921  &single_threaded_value_converter](
3922  const int thread_id,
3923  const size_t block_end) {
3924  const int num_cols = value_converters.size();
3925  const size_t start = thread_start_idx[thread_id];
3926  const size_t end = thread_end_idx[thread_id];
3927  size_t idx = 0;
3928  bool stop_convert = false;
3930  size_t local_idx = 0;
3931  for (idx = start; idx < end; ++idx, ++local_idx) {
3932  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3933  check_session_interrupted(query_session, executor))) {
3934  throw std::runtime_error(
3935  "Query execution has been interrupted while performing " +
3936  work_type_str);
3937  }
3938  single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
3939  if (stop_convert) {
3940  break;
3941  }
3942  }
3943  } else {
3944  for (idx = start; idx < end; ++idx) {
3945  single_threaded_value_converter(idx, end, num_cols, stop_convert);
3946  if (stop_convert) {
3947  break;
3948  }
3949  }
3950  }
3951  thread_start_idx[thread_id] = idx;
3952  };
3953 
3954  if (can_go_parallel) {
3955  const size_t entry_count = result_rows->entryCount();
3956  for (size_t
3957  i = 0,
3958  start_entry = 0,
3959  stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
3960  i < num_worker_threads && start_entry < entry_count;
3961  ++i, start_entry += stride) {
3962  const auto end_entry = std::min(start_entry + stride, entry_count);
3963  thread_start_idx[i] = start_entry;
3964  thread_end_idx[i] = end_entry;
3965  }
3966  } else {
3967  thread_start_idx[0] = 0;
3968  thread_end_idx[0] = result_rows->entryCount();
3969  }
3970 
3971  RenderGroupAnalyzerMap render_group_analyzer_map;
3972 
3973  for (size_t block_start = 0; block_start < num_rows;
3974  block_start += rows_per_block) {
3975  const auto num_rows_this_itr = block_start + rows_per_block < num_rows
3976  ? rows_per_block
3977  : num_rows - block_start;
3978  crt_row_idx = 0; // reset block tracker
3979  value_converters.clear();
3980  int colNum = 0;
3981  for (const auto targetDescriptor : target_column_descriptors) {
3982  auto sourceDataMetaInfo = res.targets_meta[colNum++];
3984  num_rows_this_itr,
3985  catalog,
3986  sourceDataMetaInfo,
3987  targetDescriptor,
3988  targetDescriptor->columnType,
3989  !targetDescriptor->columnType.get_notnull(),
3990  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
3992  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
3993  ? executor->getStringDictionaryProxy(
3994  sourceDataMetaInfo.get_type_info().get_comp_param(),
3995  result_rows->getRowSetMemOwner(),
3996  true)
3997  : nullptr,
3998  IS_GEO_POLY(targetDescriptor->columnType.get_type()) &&
4000  ? &render_group_analyzer_map
4001  : nullptr};
4002  auto converter = factory.create(param);
4003  value_converters.push_back(std::move(converter));
4004  }
4005 
4006  const auto translate_clock_begin = timer_start();
4007  if (can_go_parallel) {
4008  std::vector<std::future<void>> worker_threads;
4009  for (int i = 0; i < num_worker_threads; ++i) {
4010  worker_threads.push_back(
4011  std::async(std::launch::async, convert_function, i, num_rows_this_itr));
4012  }
4013 
4014  for (auto& child : worker_threads) {
4015  child.wait();
4016  }
4017  for (auto& child : worker_threads) {
4018  child.get();
4019  }
4020 
4021  } else {
4022  single_threaded_convert_function(0, num_rows_this_itr);
4023  }
4024 
4025  // finalize the insert data
4026  auto finalizer_func =
4027  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
4028  targetValueConverter->finalizeDataBlocksForInsertData();
4029  };
4030 
4031  std::vector<std::future<void>> worker_threads;
4032  for (auto& converterPtr : value_converters) {
4033  worker_threads.push_back(
4034  std::async(std::launch::async, finalizer_func, converterPtr.get()));
4035  }
4036 
4037  for (auto& child : worker_threads) {
4038  child.wait();
4039  }
4040  for (auto& child : worker_threads) {
4041  child.get();
4042  }
4043 
4045  insert_data.databaseId = catalog.getCurrentDB().dbId;
4046  CHECK(td);
4047  insert_data.tableId = td->tableId;
4048  insert_data.numRows = num_rows_this_itr;
4049 
4050  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
4052  check_session_interrupted(query_session, executor))) {
4053  throw std::runtime_error(
4054  "Query execution has been interrupted while performing " +
4055  work_type_str);
4056  }
4057  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
4058  }
4059  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
4060 
4061  const auto data_load_clock_begin = timer_start();
4062  auto data_memory_holder =
4063  import_export::fill_missing_columns(&catalog, insert_data);
4064  insertDataLoader.insertData(*session, insert_data);
4065  total_data_load_time_ms += timer_stop(data_load_clock_begin);
4066  }
4067  }
4068  }
4069  } catch (...) {
4070  try {
4071  leafs_connector_->rollback(*session, td->tableId);
4072  } catch (std::exception& e) {
4073  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
4074  << td->tableId << ", Error: " << e.what();
4075  }
4076  throw;
4077  }
4078 
4079  int64_t total_time_ms = total_source_query_time_ms +
4080  total_target_value_translate_time_ms + total_data_load_time_ms;
4081 
4082  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
4083  << "ms (outer_frag_count=" << outer_frag_count
4084  << ", query_time=" << total_source_query_time_ms
4085  << "ms, translation_time=" << total_target_value_translate_time_ms
4086  << "ms, data_load_time=" << total_data_load_time_ms
4087  << "ms)\nquery: " << select_query_;
4088 
4089  if (!is_temporary) {
4090  leafs_connector_->checkpoint(*session, td->tableId);
4091  }
4092 }
void validate_non_foreign_table_write(const TableDescriptor *table_descriptor)
Definition: FsiUtils.h:22
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool is_timestamp() const
Definition: sqltypes.h:895
#define LOG(tag)
Definition: Logger.h:216
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6207
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
bool is_time() const
Definition: sqltypes.h:516
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
QueryState & getQueryState()
Definition: QueryState.h:181
bool is_integer() const
Definition: sqltypes.h:512
specifies the content in-memory of a row in the column metadata table
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: ParserNode.cpp:102
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
#define UNLIKELY(x)
Definition: likely.h:25
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
bool table_is_temporary(const TableDescriptor *const td)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
std::string get_type_name() const
Definition: sqltypes.h:443
virtual size_t getOuterFragmentCount(QueryStateProxy, std::string &sql_query_string)=0
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1159
ThreadId thread_id()
Definition: Logger.cpp:820
virtual std::vector< AggregatedResult > query(QueryStateProxy, std::string &sql_query_string, std::vector< size_t > outer_frag_indices, bool allow_interrupt)=0
#define CHECK(condition)
Definition: Logger.h:222
bool is_geometry() const
Definition: sqltypes.h:522
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
std::map< int, import_export::RenderGroupAnalyzer > RenderGroupAnalyzerMap
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:510
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:865
bool is_decimal() const
Definition: sqltypes.h:513
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:88
bool is_date() const
Definition: sqltypes.h:883
bool is_array() const
Definition: sqltypes.h:518
const std::string getQuerySubmittedTime() const
Definition: QueryState.cpp:105
#define VLOG(n)
Definition: Logger.h:316
Type timer_start()
Definition: measure.h:42
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1159 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt(), and populateData().

QueryConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1156 of file ParserNode.h.

Referenced by Parser::CreateTableAsSelectStmt::execute(), and populateData().

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected
std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

The documentation for this class was generated from the following files: