OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Public Member Functions

 InsertIntoTableAsSelectStmt (const rapidjson::Value &payload)
 
 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
 
void execute (const Catalog_Namespace::SessionInfo &session) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

DistributedConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1110 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const rapidjson::Value &  payload)

Definition at line 2898 of file ParserNode.cpp.

References CHECK, and json_str().

2899  {
2900  CHECK(payload.HasMember("name"));
2901  table_name_ = json_str(payload["name"]);
2902 
2903  CHECK(payload.HasMember("query"));
2904  select_query_ = json_str(payload["query"]);
2905 
2906  boost::replace_all(select_query_, "\n", " ");
2907  select_query_ = "(" + select_query_ + ")";
2908 
2909  if (payload.HasMember("columns")) {
2910  CHECK(payload["columns"].IsArray());
2911  for (auto& column : payload["columns"].GetArray()) {
2912  std::string s = json_str(column);
2913  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
2914  }
2915  }
2916 }
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1142
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1114 of file ParserNode.h.

References column_list_.

1117  : table_name_(*table_name), select_query_(*select_query) {
1118  if (c) {
1119  for (auto e : *c) {
1120  column_list_.emplace_back(e);
1121  }
1122  delete c;
1123  }
1124 
1125  delete table_name;
1126  delete select_query;
1127  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1142

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 3413 of file ParserNode.cpp.

References anonymous_namespace{Utm.h}::a, query_state::QueryState::create(), legacylockmgr::ExecutorOuterLock, legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), pg_shim(), run_benchmark_import::result, gpu_enabled::sort(), STDLOG, and run_benchmark_import::tables.

Referenced by omnisci.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runSQL().

3413  {
3414  auto session_copy = session;
3415  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
3416  &session_copy, boost::null_deleter());
3417  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
3418  auto stdlog = STDLOG(query_state);
3419  auto& catalog = session_ptr->getCatalog();
3420 
3421  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3424 
3425  if (catalog.getMetadataForTable(table_name_) == nullptr) {
3426  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
3427  }
3428 
3430  std::vector<std::string> tables;
3431 
3432  // get the table info
3433  auto calcite_mgr = catalog.getCalciteMgr();
3434 
3435  // TODO MAT this should actually get the global or the session parameter for
3436  // view optimization
3437  const auto result = calcite_mgr->process(query_state->createQueryStateProxy(),
3439  {},
3440  true,
3441  false,
3442  false,
3443  true);
3444 
3445  for (auto& tab : result.resolved_accessed_objects.tables_selected_from) {
3446  tables.emplace_back(tab[0]);
3447  }
3448  tables.emplace_back(table_name_);
3449 
3450  // force sort into tableid order in case of name change to guarantee fixed order of
3451  // mutex access
3452  std::sort(tables.begin(),
3453  tables.end(),
3454  [&catalog](const std::string& a, const std::string& b) {
3455  return catalog.getMetadataForTable(a, false)->tableId <
3456  catalog.getMetadataForTable(b, false)->tableId;
3457  });
3458 
3459  tables.erase(unique(tables.begin(), tables.end()), tables.end());
3460  for (const auto& table : tables) {
3461  locks.emplace_back(
3464  catalog, table)));
3465  if (table == table_name_) {
3466  // Aquire an insert data lock for updates/deletes, consistent w/ insert. The
3467  // table data lock will be aquired in the fragmenter during checkpoint.
3468  locks.emplace_back(
3471  catalog.getDatabaseId(), (*locks.back())())));
3472  } else {
3473  locks.emplace_back(
3476  catalog.getDatabaseId(), (*locks.back())())));
3477  }
3478  }
3479 
3480  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
3481  try {
3482  populateData(query_state->createQueryStateProxy(), td, true, false);
3483  } catch (...) {
3484  throw;
3485  }
3486 }
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
Definition: LockMgr.h:270
void populateData(QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:145
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
constexpr double a
Definition: Utm.h:32
static std::shared_ptr< MutexType > getMutex(const LockType lockType, const KeyType &key)
Definition: LegacyLockMgr.h:51
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:235

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1137 of file ParserNode.h.

References select_query_.

1137 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1135 of file ParserNode.h.

References table_name_.

1135 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
const TableDescriptor td,
bool  validate_table,
bool  for_CTAS = false 
)

Definition at line 2918 of file ParserNode.cpp.

References threading_serial::async(), CHECK, anonymous_namespace{Importer.cpp}::check_session_interrupted(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, logger::ERROR, import_export::fill_missing_columns(), g_cluster, g_enable_experimental_string_functions, g_enable_non_kernel_time_query_interrupt, ResultSet::GeoTargetValue, SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), Parser::LocalConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), query_state::QueryStateProxy::getQueryState(), query_state::QueryState::getQuerySubmittedTime(), i, AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfo::is_array(), SQLTypeInfo::is_date(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_geometry(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), SQLTypeInfo::is_timestamp(), TableDescriptor::isView, LOG, Fragmenter_Namespace::InsertData::numRows, Parser::LocalConnector::query(), run_benchmark_import::res, run_benchmark_import::result, run_benchmark_import::start_time, table_is_temporary(), TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, logger::thread_id(), timer_start(), timer_stop(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, foreign_storage::validate_non_foreign_table_write(), and VLOG.

Referenced by Parser::CreateTableAsSelectStmt::execute().

2921  {
2922  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
2923  auto& catalog = session->getCatalog();
2925 
2926  LocalConnector local_connector;
2927  bool populate_table = false;
2928 
2929  if (leafs_connector_) {
2930  populate_table = true;
2931  } else {
2932  leafs_connector_ = &local_connector;
2933  if (!g_cluster) {
2934  populate_table = true;
2935  }
2936  }
2937 
2938  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
2939  std::vector<const ColumnDescriptor*> target_column_descriptors;
2940  if (column_list_.empty()) {
2941  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2942  target_column_descriptors = {std::begin(list), std::end(list)};
2943  } else {
2944  for (auto& c : column_list_) {
2945  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2946  if (cd == nullptr) {
2947  throw std::runtime_error("Column " + *c + " does not exist.");
2948  }
2949  target_column_descriptors.push_back(cd);
2950  }
2951  }
2952 
2953  return target_column_descriptors;
2954  };
2955 
2956  bool is_temporary = table_is_temporary(td);
2957 
2958  if (validate_table) {
2959  // check access privileges
2960  if (!td) {
2961  throw std::runtime_error("Table " + table_name_ + " does not exist.");
2962  }
2963  if (td->isView) {
2964  throw std::runtime_error("Insert to views is not supported yet.");
2965  }
2966 
2967  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
2969  table_name_)) {
2970  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
2971  }
2972 
2973  // only validate the select query so we get the target types
2974  // correctly, but do not populate the result set
2975  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
2976  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
2977 
2978  std::vector<const ColumnDescriptor*> target_column_descriptors =
2979  get_target_column_descriptors(td);
2980 
2981  if (source_column_descriptors.size() != target_column_descriptors.size()) {
2982  throw std::runtime_error("The number of source and target columns does not match.");
2983  }
2984 
2985  for (int i = 0; i < source_column_descriptors.size(); i++) {
2986  const ColumnDescriptor* source_cd =
2987  &(*std::next(source_column_descriptors.begin(), i));
2988  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
2989 
2990  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
2991  auto type_cannot_be_cast = [](const auto& col_type) {
2992  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
2993  col_type.is_boolean());
2994  };
2995 
2996  if (type_cannot_be_cast(source_cd->columnType) ||
2997  type_cannot_be_cast(target_cd->columnType)) {
2998  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2999  source_cd->columnType.get_type_name() +
3000  "' and target '" + target_cd->columnName + " " +
3001  target_cd->columnType.get_type_name() +
3002  "' column types do not match.");
3003  }
3004  }
3005  if (source_cd->columnType.is_array()) {
3006  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
3007  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3008  source_cd->columnType.get_type_name() +
3009  "' and target '" + target_cd->columnName + " " +
3010  target_cd->columnType.get_type_name() +
3011  "' array column element types do not match.");
3012  }
3013  }
3014 
3015  if (source_cd->columnType.is_decimal() ||
3016  source_cd->columnType.get_elem_type().is_decimal()) {
3017  SQLTypeInfo sourceType = source_cd->columnType;
3018  SQLTypeInfo targetType = target_cd->columnType;
3019 
3020  if (source_cd->columnType.is_array()) {
3021  sourceType = source_cd->columnType.get_elem_type();
3022  targetType = target_cd->columnType.get_elem_type();
3023  }
3024 
3025  if (sourceType.get_scale() != targetType.get_scale()) {
3026  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3027  source_cd->columnType.get_type_name() +
3028  "' and target '" + target_cd->columnName + " " +
3029  target_cd->columnType.get_type_name() +
3030  "' decimal columns scales do not match.");
3031  }
3032  }
3033 
3034  if (source_cd->columnType.is_string()) {
3035  if (!target_cd->columnType.is_string()) {
3036  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3037  source_cd->columnType.get_type_name() +
3038  "' and target '" + target_cd->columnName + " " +
3039  target_cd->columnType.get_type_name() +
3040  "' column types do not match.");
3041  }
3042  if (source_cd->columnType.get_compression() !=
3043  target_cd->columnType.get_compression()) {
3044  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3045  source_cd->columnType.get_type_name() +
3046  "' and target '" + target_cd->columnName + " " +
3047  target_cd->columnType.get_type_name() +
3048  "' columns string encodings do not match.");
3049  }
3050  }
3051 
3052  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
3053  if (source_cd->columnType.get_dimension() !=
3054  target_cd->columnType.get_dimension()) {
3055  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3056  source_cd->columnType.get_type_name() +
3057  "' and target '" + target_cd->columnName + " " +
3058  target_cd->columnType.get_type_name() +
3059  "' timestamp column precisions do not match.");
3060  }
3061  }
3062 
3063  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
3064  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
3065  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
3066  !source_cd->columnType.is_timestamp() &&
3067  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
3068  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3069  source_cd->columnType.get_type_name() +
3070  "' and target '" + target_cd->columnName + " " +
3071  target_cd->columnType.get_type_name() +
3072  "' column encoding sizes do not match.");
3073  }
3074  }
3075  }
3076 
3077  if (!populate_table) {
3078  return;
3079  }
3080 
3081  int64_t total_row_count = 0;
3082  int64_t total_source_query_time_ms = 0;
3083  int64_t total_target_value_translate_time_ms = 0;
3084  int64_t total_data_load_time_ms = 0;
3085 
3087  auto target_column_descriptors = get_target_column_descriptors(td);
3088  auto outer_frag_count =
3090 
3091  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
3092  auto query_session = session ? session->get_session_id() : "";
3094  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
3095  try {
3096  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
3097  std::vector<size_t> allowed_outer_fragment_indices;
3098 
3099  if (outer_frag_count) {
3100  allowed_outer_fragment_indices.push_back(outer_frag_idx);
3101  }
3102 
3103  const auto query_clock_begin = timer_start();
3104  std::vector<AggregatedResult> query_results =
3105  leafs_connector_->query(query_state_proxy,
3106  select_query_,
3107  allowed_outer_fragment_indices,
3109  total_source_query_time_ms += timer_stop(query_clock_begin);
3110 
3111  auto start_time = query_state_proxy.getQueryState().getQuerySubmittedTime();
3112  auto query_str = "INSERT_DATA for " + work_type_str;
3114  // In the clean-up phase of the query execution for collecting aggregated result
3115  // of SELECT query, we remove its query session info, so we need to enroll the
3116  // session info again
3117  executor->enrollQuerySession(query_session,
3118  query_str,
3119  start_time,
3121  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3122  }
3123 
3124  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
3125  // this data population is non-kernel operation, so we manually cleanup
3126  // the query session info in the cleanup phase
3128  executor->clearQuerySessionStatus(query_session, start_time);
3129  }
3130  };
3131 
3132  for (auto& res : query_results) {
3133  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3134  throw std::runtime_error(
3135  "Query execution has been interrupted while performing " + work_type_str);
3136  }
3137  auto& result_rows = res.rs;
3138  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
3139  const auto num_rows = result_rows->rowCount();
3140 
3141  if (0 == num_rows) {
3142  continue;
3143  }
3144 
3145  total_row_count += num_rows;
3146 
3147  size_t leaf_count = leafs_connector_->leafCount();
3148 
3149  // ensure that at least 1 row is processed per block up to a maximum of 65536 rows
3150  const size_t rows_per_block =
3151  std::max(std::min(num_rows / leaf_count, size_t(64 * 1024)), size_t(1));
3152 
3153  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
3154 
3156 
3157  const int num_worker_threads = std::thread::hardware_concurrency();
3158 
3159  std::vector<size_t> thread_start_idx(num_worker_threads),
3160  thread_end_idx(num_worker_threads);
3161  bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
3162 
3163  std::atomic<size_t> crt_row_idx{0};
3164 
3165  auto do_work = [&result_rows, &value_converters, &crt_row_idx](
3166  const size_t idx,
3167  const size_t block_end,
3168  const size_t num_cols,
3169  const size_t thread_id,
3170  bool& stop_convert) {
3171  const auto result_row = result_rows->getRowAtNoTranslations(idx);
3172  if (!result_row.empty()) {
3173  size_t target_row = crt_row_idx.fetch_add(1);
3174  if (target_row >= block_end) {
3175  stop_convert = true;
3176  return;
3177  }
3178  for (unsigned int col = 0; col < num_cols; col++) {
3179  const auto& mapd_variant = result_row[col];
3180  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3181  }
3182  }
3183  };
3184 
3185  auto convert_function = [&thread_start_idx,
3186  &thread_end_idx,
3187  &value_converters,
3188  &executor,
3189  &query_session,
3190  &work_type_str,
3191  &do_work](const int thread_id, const size_t block_end) {
3192  const int num_cols = value_converters.size();
3193  const size_t start = thread_start_idx[thread_id];
3194  const size_t end = thread_end_idx[thread_id];
3195  size_t idx = 0;
3196  bool stop_convert = false;
3198  size_t local_idx = 0;
3199  for (idx = start; idx < end; ++idx, ++local_idx) {
3200  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3201  check_session_interrupted(query_session, executor))) {
3202  throw std::runtime_error(
3203  "Query execution has been interrupted while performing " +
3204  work_type_str);
3205  }
3206  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3207  if (stop_convert) {
3208  break;
3209  }
3210  }
3211  } else {
3212  for (idx = start; idx < end; ++idx) {
3213  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3214  if (stop_convert) {
3215  break;
3216  }
3217  }
3218  }
3219  thread_start_idx[thread_id] = idx;
3220  };
3221 
3222  auto single_threaded_value_converter =
3223  [&crt_row_idx, &value_converters, &result_rows](const size_t idx,
3224  const size_t block_end,
3225  const size_t num_cols,
3226  bool& stop_convert) {
3227  size_t target_row = crt_row_idx.fetch_add(1);
3228  if (target_row >= block_end) {
3229  stop_convert = true;
3230  return;
3231  }
3232  const auto result_row = result_rows->getNextRow(false, false);
3233  CHECK(!result_row.empty());
3234  for (unsigned int col = 0; col < num_cols; col++) {
3235  const auto& mapd_variant = result_row[col];
3236  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3237  }
3238  };
3239 
3240  auto single_threaded_convert_function = [&value_converters,
3241  &thread_start_idx,
3242  &thread_end_idx,
3243  &executor,
3244  &query_session,
3245  &work_type_str,
3246  &single_threaded_value_converter](
3247  const int thread_id,
3248  const size_t block_end) {
3249  const int num_cols = value_converters.size();
3250  const size_t start = thread_start_idx[thread_id];
3251  const size_t end = thread_end_idx[thread_id];
3252  size_t idx = 0;
3253  bool stop_convert = false;
3255  size_t local_idx = 0;
3256  for (idx = start; idx < end; ++idx, ++local_idx) {
3257  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3258  check_session_interrupted(query_session, executor))) {
3259  throw std::runtime_error(
3260  "Query execution has been interrupted while performing " +
3261  work_type_str);
3262  }
3263  single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
3264  if (stop_convert) {
3265  break;
3266  }
3267  }
3268  } else {
3269  for (idx = start; idx < end; ++idx) {
3270  single_threaded_value_converter(idx, end, num_cols, stop_convert);
3271  if (stop_convert) {
3272  break;
3273  }
3274  }
3275  }
3276  thread_start_idx[thread_id] = idx;
3277  };
3278 
3279  if (can_go_parallel) {
3280  const size_t entry_count = result_rows->entryCount();
3281  for (size_t
3282  i = 0,
3283  start_entry = 0,
3284  stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
3285  i < num_worker_threads && start_entry < entry_count;
3286  ++i, start_entry += stride) {
3287  const auto end_entry = std::min(start_entry + stride, entry_count);
3288  thread_start_idx[i] = start_entry;
3289  thread_end_idx[i] = end_entry;
3290  }
3291  } else {
3292  thread_start_idx[0] = 0;
3293  thread_end_idx[0] = result_rows->entryCount();
3294  }
3295 
3296  for (size_t block_start = 0; block_start < num_rows;
3297  block_start += rows_per_block) {
3298  const auto num_rows_this_itr = block_start + rows_per_block < num_rows
3299  ? rows_per_block
3300  : num_rows - block_start;
3301  crt_row_idx = 0; // reset block tracker
3302  value_converters.clear();
3303  int colNum = 0;
3304  for (const auto targetDescriptor : target_column_descriptors) {
3305  auto sourceDataMetaInfo = res.targets_meta[colNum++];
3306 
3308  num_rows_this_itr,
3309  catalog,
3310  sourceDataMetaInfo,
3311  targetDescriptor,
3312  targetDescriptor->columnType,
3313  !targetDescriptor->columnType.get_notnull(),
3314  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
3316  ? executor->getStringDictionaryProxy(
3317  sourceDataMetaInfo.get_type_info().get_comp_param(),
3318  result_rows->getRowSetMemOwner(),
3319  true)
3320  : nullptr};
3321  auto converter = factory.create(param);
3322  value_converters.push_back(std::move(converter));
3323  }
3324 
3325  const auto translate_clock_begin = timer_start();
3326  if (can_go_parallel) {
3327  std::vector<std::future<void>> worker_threads;
3328  for (int i = 0; i < num_worker_threads; ++i) {
3329  worker_threads.push_back(
3330  std::async(std::launch::async, convert_function, i, num_rows_this_itr));
3331  }
3332 
3333  for (auto& child : worker_threads) {
3334  child.wait();
3335  }
3336  for (auto& child : worker_threads) {
3337  child.get();
3338  }
3339 
3340  } else {
3341  single_threaded_convert_function(0, num_rows_this_itr);
3342  }
3343 
3344  // finalize the insert data
3345  auto finalizer_func =
3346  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
3347  targetValueConverter->finalizeDataBlocksForInsertData();
3348  };
3349 
3350  std::vector<std::future<void>> worker_threads;
3351  for (auto& converterPtr : value_converters) {
3352  worker_threads.push_back(
3353  std::async(std::launch::async, finalizer_func, converterPtr.get()));
3354  }
3355 
3356  for (auto& child : worker_threads) {
3357  child.wait();
3358  }
3359  for (auto& child : worker_threads) {
3360  child.get();
3361  }
3362 
3364  insert_data.databaseId = catalog.getCurrentDB().dbId;
3365  CHECK(td);
3366  insert_data.tableId = td->tableId;
3367  insert_data.numRows = num_rows_this_itr;
3368 
3369  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
3371  check_session_interrupted(query_session, executor))) {
3372  throw std::runtime_error(
3373  "Query execution has been interrupted while performing " +
3374  work_type_str);
3375  }
3376  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
3377  }
3378  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
3379 
3380  const auto data_load_clock_begin = timer_start();
3381  auto data_memory_holder =
3382  import_export::fill_missing_columns(&catalog, insert_data);
3383  insertDataLoader.insertData(*session, insert_data);
3384  total_data_load_time_ms += timer_stop(data_load_clock_begin);
3385  }
3386  }
3387  }
3388  } catch (...) {
3389  try {
3390  leafs_connector_->rollback(*session, td->tableId);
3391  } catch (std::exception& e) {
3392  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
3393  << td->tableId << ", Error: " << e.what();
3394  }
3395  throw;
3396  }
3397 
3398  int64_t total_time_ms = total_source_query_time_ms +
3399  total_target_value_translate_time_ms + total_data_load_time_ms;
3400 
3401  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
3402  << "ms (outer_frag_count=" << outer_frag_count
3403  << ", query_time=" << total_source_query_time_ms
3404  << "ms, translation_time=" << total_target_value_translate_time_ms
3405  << "ms, data_load_time=" << total_data_load_time_ms
3406  << "ms)\nquery: " << select_query_;
3407 
3408  if (!is_temporary) {
3409  leafs_connector_->checkpoint(*session, td->tableId);
3410  }
3411 }
void validate_non_foreign_table_write(const TableDescriptor *table_descriptor)
Definition: FsiUtils.h:22
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool is_timestamp() const
Definition: sqltypes.h:762
#define LOG(tag)
Definition: Logger.h:203
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:163
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:5414
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
bool is_time() const
Definition: sqltypes.h:510
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:171
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
future< Result > async(Fn &&fn, Args &&...args)
QueryState & getQueryState()
Definition: QueryState.h:181
bool is_integer() const
Definition: sqltypes.h:506
virtual size_t getOuterFragmentCount(QueryStateProxy, std::string &sql_query_string)=0
specifies the content in-memory of a row in the column metadata table
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: ParserNode.cpp:95
#define UNLIKELY(x)
Definition: likely.h:25
virtual std::vector< AggregatedResult > query(QueryStateProxy, std::string &sql_query_string, std::vector< size_t > outer_frag_indices, bool allow_interrupt)=0
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
bool table_is_temporary(const TableDescriptor *const td)
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
bool g_enable_experimental_string_functions
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
std::string get_type_name() const
Definition: sqltypes.h:432
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1142
ThreadId thread_id()
Definition: Logger.cpp:791
DistributedConnector * leafs_connector_
Definition: ParserNode.h:1139
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:516
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:504
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:732
bool is_decimal() const
Definition: sqltypes.h:507
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:81
bool is_date() const
Definition: sqltypes.h:750
bool is_array() const
Definition: sqltypes.h:512
const std::string getQuerySubmittedTime() const
Definition: QueryState.cpp:98
#define VLOG(n)
Definition: Logger.h:303
Type timer_start()
Definition: measure.h:42
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:384

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1142 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt().

DistributedConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1139 of file ParserNode.h.

Referenced by Parser::CreateTableAsSelectStmt::execute().

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected

Definition at line 1144 of file ParserNode.h.

Referenced by Parser::CreateTableAsSelectStmt::execute(), and get_select_query().

std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

Definition at line 1143 of file ParserNode.h.

Referenced by Parser::CreateTableAsSelectStmt::execute(), and get_table().


The documentation for this class was generated from the following files: