OmniSciDB  ba1bac9284
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Public Member Functions

 InsertIntoTableAsSelectStmt (const rapidjson::Value &payload)
 
 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
 
void execute (const Catalog_Namespace::SessionInfo &session) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

DistributedConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1082 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const rapidjson::Value &  payload)

Definition at line 2791 of file ParserNode.cpp.

References CHECK, and json_str().

2792  {
2793  CHECK(payload.HasMember("name"));
2794  table_name_ = json_str(payload["name"]);
2795 
2796  CHECK(payload.HasMember("query"));
2797  select_query_ = json_str(payload["query"]);
2798 
2799  boost::replace_all(select_query_, "\n", " ");
2800  select_query_ = "(" + select_query_ + ")";
2801 
2802  if (payload.HasMember("columns")) {
2803  CHECK(payload["columns"].IsArray());
2804  for (auto& column : payload["columns"].GetArray()) {
2805  std::string s = json_str(column);
2806  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
2807  }
2808  }
2809 }
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1114
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1086 of file ParserNode.h.

References column_list_.

1089  : table_name_(*table_name), select_query_(*select_query) {
1090  if (c) {
1091  for (auto e : *c) {
1092  column_list_.emplace_back(e);
1093  }
1094  delete c;
1095  }
1096 
1097  delete table_name;
1098  delete select_query;
1099  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1114

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 3310 of file ParserNode.cpp.

References query_state::QueryState::create(), legacylockmgr::ExecutorOuterLock, legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), pg_shim(), run_benchmark_import::result, gpu_enabled::sort(), STDLOG, test_readcsv::table, and run_benchmark_import::tables.

Referenced by omnisci.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runSQL().

3310  {
3311  auto session_copy = session;
3312  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
3313  &session_copy, boost::null_deleter());
3314  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
3315  auto stdlog = STDLOG(query_state);
3316  auto& catalog = session_ptr->getCatalog();
3317 
3318  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3321 
3322  if (catalog.getMetadataForTable(table_name_) == nullptr) {
3323  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
3324  }
3325 
3327  std::vector<std::string> tables;
3328 
3329  // get the table info
3330  auto calcite_mgr = catalog.getCalciteMgr();
3331 
3332  // TODO MAT this should actually get the global or the session parameter for
3333  // view optimization
3334  const auto result = calcite_mgr->process(query_state->createQueryStateProxy(),
3336  {},
3337  true,
3338  false,
3339  false,
3340  true);
3341 
3342  for (auto& tab : result.resolved_accessed_objects.tables_selected_from) {
3343  tables.emplace_back(tab[0]);
3344  }
3345  tables.emplace_back(table_name_);
3346 
3347  // force sort into tableid order in case of name change to guarantee fixed order of
3348  // mutex access
3349  std::sort(tables.begin(),
3350  tables.end(),
3351  [&catalog](const std::string& a, const std::string& b) {
3352  return catalog.getMetadataForTable(a, false)->tableId <
3353  catalog.getMetadataForTable(b, false)->tableId;
3354  });
3355 
3356  tables.erase(unique(tables.begin(), tables.end()), tables.end());
3357  for (const auto& table : tables) {
3358  locks.emplace_back(
3361  catalog, table)));
3362  if (table == table_name_) {
3363  // Aquire an insert data lock for updates/deletes, consistent w/ insert. The
3364  // table data lock will be aquired in the fragmenter during checkpoint.
3365  locks.emplace_back(
3368  catalog.getDatabaseId(), (*locks.back())())));
3369  } else {
3370  locks.emplace_back(
3373  catalog.getDatabaseId(), (*locks.back())())));
3374  }
3375  }
3376 
3377  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
3378  try {
3379  populateData(query_state->createQueryStateProxy(), td, true, false);
3380  } catch (...) {
3381  throw;
3382  }
3383 }
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
Definition: LockMgr.h:270
void populateData(QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:141
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
static std::shared_ptr< MutexType > getMutex(const LockType lockType, const KeyType &key)
Definition: LegacyLockMgr.h:51
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:229

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1109 of file ParserNode.h.

References select_query_.

1109 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1107 of file ParserNode.h.

References table_name_.

1107 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
const TableDescriptor td,
bool  validate_table,
bool  for_CTAS = false 
)

Definition at line 2811 of file ParserNode.cpp.

References CHECK, anonymous_namespace{Importer.cpp}::checkInterrupt(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, logger::ERROR, import_export::fill_missing_columns(), g_cluster, g_enable_experimental_string_functions, g_enable_non_kernel_time_query_interrupt, ResultSet::GeoTargetValue, SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), Parser::LocalConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), query_state::QueryStateProxy::getQueryState(), query_state::QueryState::getQuerySubmittedTime(), i, AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfo::is_array(), SQLTypeInfo::is_date(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_geometry(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), SQLTypeInfo::is_timestamp(), TableDescriptor::isView, LOG, Fragmenter_Namespace::InsertData::numRows, Parser::LocalConnector::query(), run_benchmark_import::res, run_benchmark_import::result, run_benchmark_import::start_time, table_is_temporary(), TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, logger::thread_id(), timer_start(), timer_stop(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, foreign_storage::validate_non_foreign_table_write(), and VLOG.

2814  {
2815  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
2816  auto& catalog = session->getCatalog();
2818 
2819  LocalConnector local_connector;
2820  bool populate_table = false;
2821 
2822  if (leafs_connector_) {
2823  populate_table = true;
2824  } else {
2825  leafs_connector_ = &local_connector;
2826  if (!g_cluster) {
2827  populate_table = true;
2828  }
2829  }
2830 
2831  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
2832  std::vector<const ColumnDescriptor*> target_column_descriptors;
2833  if (column_list_.empty()) {
2834  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2835  target_column_descriptors = {std::begin(list), std::end(list)};
2836  } else {
2837  for (auto& c : column_list_) {
2838  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2839  if (cd == nullptr) {
2840  throw std::runtime_error("Column " + *c + " does not exist.");
2841  }
2842  target_column_descriptors.push_back(cd);
2843  }
2844  }
2845 
2846  return target_column_descriptors;
2847  };
2848 
2849  bool is_temporary = table_is_temporary(td);
2850 
2851  if (validate_table) {
2852  // check access privileges
2853  if (!td) {
2854  throw std::runtime_error("Table " + table_name_ + " does not exist.");
2855  }
2856  if (td->isView) {
2857  throw std::runtime_error("Insert to views is not supported yet.");
2858  }
2859 
2860  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
2862  table_name_)) {
2863  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
2864  }
2865 
2866  // only validate the select query so we get the target types
2867  // correctly, but do not populate the result set
2868  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
2869  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
2870 
2871  std::vector<const ColumnDescriptor*> target_column_descriptors =
2872  get_target_column_descriptors(td);
2873 
2874  if (source_column_descriptors.size() != target_column_descriptors.size()) {
2875  throw std::runtime_error("The number of source and target columns does not match.");
2876  }
2877 
2878  for (int i = 0; i < source_column_descriptors.size(); i++) {
2879  const ColumnDescriptor* source_cd =
2880  &(*std::next(source_column_descriptors.begin(), i));
2881  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
2882 
2883  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
2884  auto type_cannot_be_cast = [](const auto& col_type) {
2885  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
2886  col_type.is_boolean());
2887  };
2888 
2889  if (type_cannot_be_cast(source_cd->columnType) ||
2890  type_cannot_be_cast(target_cd->columnType)) {
2891  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2892  source_cd->columnType.get_type_name() +
2893  "' and target '" + target_cd->columnName + " " +
2894  target_cd->columnType.get_type_name() +
2895  "' column types do not match.");
2896  }
2897  }
2898  if (source_cd->columnType.is_array()) {
2899  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
2900  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2901  source_cd->columnType.get_type_name() +
2902  "' and target '" + target_cd->columnName + " " +
2903  target_cd->columnType.get_type_name() +
2904  "' array column element types do not match.");
2905  }
2906  }
2907 
2908  if (source_cd->columnType.is_decimal() ||
2909  source_cd->columnType.get_elem_type().is_decimal()) {
2910  SQLTypeInfo sourceType = source_cd->columnType;
2911  SQLTypeInfo targetType = target_cd->columnType;
2912 
2913  if (source_cd->columnType.is_array()) {
2914  sourceType = source_cd->columnType.get_elem_type();
2915  targetType = target_cd->columnType.get_elem_type();
2916  }
2917 
2918  if (sourceType.get_scale() != targetType.get_scale()) {
2919  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2920  source_cd->columnType.get_type_name() +
2921  "' and target '" + target_cd->columnName + " " +
2922  target_cd->columnType.get_type_name() +
2923  "' decimal columns scales do not match.");
2924  }
2925  }
2926 
2927  if (source_cd->columnType.is_string()) {
2928  if (!target_cd->columnType.is_string()) {
2929  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2930  source_cd->columnType.get_type_name() +
2931  "' and target '" + target_cd->columnName + " " +
2932  target_cd->columnType.get_type_name() +
2933  "' column types do not match.");
2934  }
2935  if (source_cd->columnType.get_compression() !=
2936  target_cd->columnType.get_compression()) {
2937  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2938  source_cd->columnType.get_type_name() +
2939  "' and target '" + target_cd->columnName + " " +
2940  target_cd->columnType.get_type_name() +
2941  "' columns string encodings do not match.");
2942  }
2943  }
2944 
2945  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
2946  if (source_cd->columnType.get_dimension() !=
2947  target_cd->columnType.get_dimension()) {
2948  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2949  source_cd->columnType.get_type_name() +
2950  "' and target '" + target_cd->columnName + " " +
2951  target_cd->columnType.get_type_name() +
2952  "' timestamp column precisions do not match.");
2953  }
2954  }
2955 
2956  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
2957  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
2958  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
2959  !source_cd->columnType.is_timestamp() &&
2960  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
2961  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2962  source_cd->columnType.get_type_name() +
2963  "' and target '" + target_cd->columnName + " " +
2964  target_cd->columnType.get_type_name() +
2965  "' column encoding sizes do not match.");
2966  }
2967  }
2968  }
2969 
2970  if (!populate_table) {
2971  return;
2972  }
2973 
2974  int64_t total_row_count = 0;
2975  int64_t total_source_query_time_ms = 0;
2976  int64_t total_target_value_translate_time_ms = 0;
2977  int64_t total_data_load_time_ms = 0;
2978 
2980  auto target_column_descriptors = get_target_column_descriptors(td);
2981  auto outer_frag_count =
2983 
2984  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
2985  auto query_session = session ? session->get_session_id() : "";
2987  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
2988  try {
2989  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
2990  std::vector<size_t> allowed_outer_fragment_indices;
2991 
2992  if (outer_frag_count) {
2993  allowed_outer_fragment_indices.push_back(outer_frag_idx);
2994  }
2995 
2996  const auto query_clock_begin = timer_start();
2997  std::vector<AggregatedResult> query_results =
2998  leafs_connector_->query(query_state_proxy,
2999  select_query_,
3000  allowed_outer_fragment_indices,
3002  total_source_query_time_ms += timer_stop(query_clock_begin);
3003 
3004  auto start_time = query_state_proxy.getQueryState().getQuerySubmittedTime();
3005  auto query_str = "INSERT_DATA for " + work_type_str;
3007  // In the clean-up phase of the query execution for collecting aggregated result
3008  // of SELECT query, we remove its query session info, so we need to enroll the
3009  // session info again
3010  executor->enrollQuerySession(query_session,
3011  query_str,
3012  start_time,
3014  QuerySessionStatus::QueryStatus::RUNNING);
3015  }
3016 
3017  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
3018  // this data population is non-kernel operation, so we manually cleanup
3019  // the query session info in the cleanup phase
3021  executor->clearQuerySessionStatus(query_session, start_time, false);
3022  }
3023  };
3024 
3025  for (auto& res : query_results) {
3026  if (UNLIKELY(checkInterrupt(query_session, executor))) {
3027  throw std::runtime_error(
3028  "Query execution has been interrupted while performing " + work_type_str);
3029  }
3030  auto result_rows = res.rs;
3031  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
3032  const auto num_rows = result_rows->rowCount();
3033 
3034  if (0 == num_rows) {
3035  continue;
3036  }
3037 
3038  total_row_count += num_rows;
3039 
3040  size_t leaf_count = leafs_connector_->leafCount();
3041 
3042  size_t max_number_of_rows_per_package =
3043  std::min(num_rows / leaf_count, size_t(64 * 1024));
3044 
3045  size_t start_row = 0;
3046  size_t num_rows_to_process = std::min(num_rows, max_number_of_rows_per_package);
3047 
3048  // ensure that at least one row is being processed
3049  num_rows_to_process = std::max(num_rows_to_process, size_t(1));
3050 
3051  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
3052 
3054 
3055  const int num_worker_threads = std::thread::hardware_concurrency();
3056 
3057  std::vector<size_t> thread_start_idx(num_worker_threads),
3058  thread_end_idx(num_worker_threads);
3059  bool can_go_parallel = !result_rows->isTruncated() && num_rows_to_process > 20000;
3060 
3061  std::atomic<size_t> row_idx{0};
3062 
3063  auto do_work = [&result_rows, &num_rows_to_process, &value_converters, &row_idx](
3064  const size_t idx,
3065  const size_t end,
3066  const size_t num_cols,
3067  const size_t thread_id,
3068  bool& stop_convert) {
3069  const auto result_row = result_rows->getRowAtNoTranslations(idx);
3070  if (!result_row.empty()) {
3071  size_t target_row = row_idx.fetch_add(1);
3072  if (target_row >= num_rows_to_process) {
3073  stop_convert = true;
3074  return;
3075  }
3076  for (unsigned int col = 0; col < num_cols; col++) {
3077  const auto& mapd_variant = result_row[col];
3078  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3079  }
3080  }
3081  };
3082 
3083  auto convert_function = [&thread_start_idx,
3084  &thread_end_idx,
3085  &value_converters,
3086  &executor,
3087  &query_session,
3088  &work_type_str,
3089  &do_work](const int thread_id) {
3090  const int num_cols = value_converters.size();
3091  const size_t start = thread_start_idx[thread_id];
3092  const size_t end = thread_end_idx[thread_id];
3093  size_t idx = 0;
3094  bool stop_convert = false;
3096  size_t local_idx = 0;
3097  for (idx = start; idx < end; ++idx, ++local_idx) {
3098  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3099  checkInterrupt(query_session, executor))) {
3100  throw std::runtime_error(
3101  "Query execution has been interrupted while performing " +
3102  work_type_str);
3103  }
3104  do_work(idx, end, num_cols, thread_id, stop_convert);
3105  if (stop_convert) {
3106  break;
3107  }
3108  }
3109  } else {
3110  for (idx = start; idx < end; ++idx) {
3111  do_work(idx, end, num_cols, thread_id, stop_convert);
3112  if (stop_convert) {
3113  break;
3114  }
3115  }
3116  }
3117  thread_start_idx[thread_id] = idx;
3118  };
3119 
3120  auto single_threaded_value_converter =
3121  [&row_idx, &num_rows_to_process, &value_converters, &result_rows](
3122  const size_t idx,
3123  const size_t end,
3124  const size_t num_cols,
3125  bool& stop_convert) {
3126  size_t target_row = row_idx.fetch_add(1);
3127  if (target_row >= num_rows_to_process) {
3128  stop_convert = true;
3129  return;
3130  }
3131  const auto result_row = result_rows->getNextRow(false, false);
3132  CHECK(!result_row.empty());
3133  for (unsigned int col = 0; col < num_cols; col++) {
3134  const auto& mapd_variant = result_row[col];
3135  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3136  }
3137  };
3138 
3139  auto single_threaded_convert_function =
3140  [&value_converters,
3141  &thread_start_idx,
3142  &thread_end_idx,
3143  &executor,
3144  &query_session,
3145  &work_type_str,
3146  &single_threaded_value_converter](const int thread_id) {
3147  const int num_cols = value_converters.size();
3148  const size_t start = thread_start_idx[thread_id];
3149  const size_t end = thread_end_idx[thread_id];
3150  size_t idx = 0;
3151  bool stop_convert = false;
3153  size_t local_idx = 0;
3154  for (idx = start; idx < end; ++idx, ++local_idx) {
3155  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3156  checkInterrupt(query_session, executor))) {
3157  throw std::runtime_error(
3158  "Query execution has been interrupted while performing " +
3159  work_type_str);
3160  }
3161  single_threaded_value_converter(idx, end, num_cols, stop_convert);
3162  if (stop_convert) {
3163  break;
3164  }
3165  }
3166  } else {
3167  for (idx = start; idx < end; ++idx) {
3168  single_threaded_value_converter(idx, end, num_cols, stop_convert);
3169  if (stop_convert) {
3170  break;
3171  }
3172  }
3173  }
3174  thread_start_idx[thread_id] = idx;
3175  };
3176 
3177  if (can_go_parallel) {
3178  const size_t entryCount = result_rows->entryCount();
3179  for (size_t i = 0,
3180  start_entry = 0,
3181  stride = (entryCount + num_worker_threads - 1) / num_worker_threads;
3182  i < num_worker_threads && start_entry < entryCount;
3183  ++i, start_entry += stride) {
3184  const auto end_entry = std::min(start_entry + stride, entryCount);
3185  thread_start_idx[i] = start_entry;
3186  thread_end_idx[i] = end_entry;
3187  }
3188  } else {
3189  thread_start_idx[0] = 0;
3190  thread_end_idx[0] = result_rows->entryCount();
3191  }
3192 
3193  while (start_row < num_rows) {
3194  value_converters.clear();
3195  row_idx = 0;
3196  int colNum = 0;
3197  for (const auto targetDescriptor : target_column_descriptors) {
3198  auto sourceDataMetaInfo = res.targets_meta[colNum++];
3199 
3201  num_rows_to_process,
3202  catalog,
3203  sourceDataMetaInfo,
3204  targetDescriptor,
3205  targetDescriptor->columnType,
3206  !targetDescriptor->columnType.get_notnull(),
3207  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
3209  ? executor->getStringDictionaryProxy(
3210  sourceDataMetaInfo.get_type_info().get_comp_param(),
3211  result_rows->getRowSetMemOwner(),
3212  true)
3213  : nullptr};
3214  auto converter = factory.create(param);
3215  value_converters.push_back(std::move(converter));
3216  }
3217 
3218  const auto translate_clock_begin = timer_start();
3219  if (can_go_parallel) {
3220  std::vector<std::future<void>> worker_threads;
3221  for (int i = 0; i < num_worker_threads; ++i) {
3222  worker_threads.push_back(
3223  std::async(std::launch::async, convert_function, i));
3224  }
3225 
3226  for (auto& child : worker_threads) {
3227  child.wait();
3228  }
3229  for (auto& child : worker_threads) {
3230  child.get();
3231  }
3232 
3233  } else {
3234  single_threaded_convert_function(0);
3235  }
3236 
3237  // finalize the insert data
3238  auto finalizer_func =
3239  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
3240  targetValueConverter->finalizeDataBlocksForInsertData();
3241  };
3242 
3243  std::vector<std::future<void>> worker_threads;
3244  for (auto& converterPtr : value_converters) {
3245  worker_threads.push_back(
3246  std::async(std::launch::async, finalizer_func, converterPtr.get()));
3247  }
3248 
3249  for (auto& child : worker_threads) {
3250  child.wait();
3251  }
3252  for (auto& child : worker_threads) {
3253  child.get();
3254  }
3255 
3257  insert_data.databaseId = catalog.getCurrentDB().dbId;
3258  CHECK(td);
3259  insert_data.tableId = td->tableId;
3260  insert_data.numRows = num_rows_to_process;
3261 
3262  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
3264  checkInterrupt(query_session, executor))) {
3265  throw std::runtime_error(
3266  "Query execution has been interrupted while performing " +
3267  work_type_str);
3268  }
3269  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
3270  }
3271  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
3272 
3273  const auto data_load_clock_begin = timer_start();
3274  auto data_memory_holder =
3275  import_export::fill_missing_columns(&catalog, insert_data);
3276  insertDataLoader.insertData(*session, insert_data);
3277  total_data_load_time_ms += timer_stop(data_load_clock_begin);
3278 
3279  start_row += num_rows_to_process;
3280  num_rows_to_process =
3281  std::min(num_rows - start_row, max_number_of_rows_per_package);
3282  }
3283  }
3284  }
3285  } catch (...) {
3286  try {
3287  leafs_connector_->rollback(*session, td->tableId);
3288  } catch (std::exception& e) {
3289  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
3290  << td->tableId << ", Error: " << e.what();
3291  }
3292  throw;
3293  }
3294 
3295  int64_t total_time_ms = total_source_query_time_ms +
3296  total_target_value_translate_time_ms + total_data_load_time_ms;
3297 
3298  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
3299  << "ms (outer_frag_count=" << outer_frag_count
3300  << ", query_time=" << total_source_query_time_ms
3301  << "ms, translation_time=" << total_target_value_translate_time_ms
3302  << "ms, data_load_time=" << total_data_load_time_ms
3303  << "ms)\nquery: " << select_query_;
3304 
3305  if (!is_temporary) {
3306  leafs_connector_->checkpoint(*session, td->tableId);
3307  }
3308 }
void validate_non_foreign_table_write(const TableDescriptor *table_descriptor)
Definition: FsiUtils.h:22
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:315
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
bool is_timestamp() const
Definition: sqltypes.h:743
#define LOG(tag)
Definition: Logger.h:200
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:163
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5183
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
bool is_time() const
Definition: sqltypes.h:495
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:166
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
QueryState & getQueryState()
Definition: QueryState.h:175
bool is_integer() const
Definition: sqltypes.h:491
virtual size_t getOuterFragmentCount(QueryStateProxy, std::string &sql_query_string)=0
specifies the content in-memory of a row in the column metadata table
#define UNLIKELY(x)
Definition: likely.h:25
virtual std::vector< AggregatedResult > query(QueryStateProxy, std::string &sql_query_string, std::vector< size_t > outer_frag_indices, bool allow_interrupt)=0
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
bool table_is_temporary(const TableDescriptor *const td)
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
bool g_enable_experimental_string_functions
bool checkInterrupt(const QuerySessionId &query_session, Executor *executor)
Definition: ParserNode.cpp:94
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:316
std::string get_type_name() const
Definition: sqltypes.h:417
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1114
ThreadId thread_id()
Definition: Logger.cpp:732
DistributedConnector * leafs_connector_
Definition: ParserNode.h:1111
#define CHECK(condition)
Definition: Logger.h:206
bool is_geometry() const
Definition: sqltypes.h:501
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:489
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
bool is_decimal() const
Definition: sqltypes.h:492
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:79
bool is_date() const
Definition: sqltypes.h:731
bool is_array() const
Definition: sqltypes.h:497
const std::string getQuerySubmittedTime() const
Definition: QueryState.cpp:96
#define VLOG(n)
Definition: Logger.h:300
Type timer_start()
Definition: measure.h:42
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366

+ Here is the call graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1114 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt().

DistributedConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1111 of file ParserNode.h.

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected

Definition at line 1116 of file ParserNode.h.

Referenced by get_select_query().

std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

Definition at line 1115 of file ParserNode.h.

Referenced by get_table().


The documentation for this class was generated from the following files: