OmniSciDB  0bd2ec9cf4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Classes

struct  DistributedConnector
 
struct  LocalConnector
 

Public Member Functions

 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, bool is_temporary, bool validate_table)
 
void execute (const Catalog_Namespace::SessionInfo &session) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

DistributedConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1007 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1010 of file ParserNode.h.

References column_list_.

1013  : table_name_(*table_name), select_query_(*select_query) {
1014  if (c) {
1015  for (auto e : *c) {
1016  column_list_.emplace_back(e);
1017  }
1018  delete c;
1019  }
1020 
1021  delete table_name;
1022  delete select_query;
1023  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1060

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 2767 of file ParserNode.cpp.

References query_state::QueryState::create(), and STDLOG.

2767  {
2768  auto session_copy = session;
2769  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
2770  &session_copy, boost::null_deleter());
2771  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
2772  auto stdlog = STDLOG(query_state);
2773  populateData(query_state->createQueryStateProxy(), false, true);
2774 }
void populateData(QueryStateProxy, bool is_temporary, bool validate_table)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:140
#define STDLOG(...)
Definition: QueryState.h:225

+ Here is the call graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1030 of file ParserNode.h.

References select_query_.

1030 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1028 of file ParserNode.h.

References table_name_.

1028 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
bool  is_temporary,
bool  validate_table 
)

Definition at line 2401 of file ParserNode.cpp.

References CHECK(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, g_cluster, g_enable_experimental_string_functions, ResultSet::GeoTargetValue, SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_elem_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type_name(), Parser::InsertIntoTableAsSelectStmt::LocalConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), query_state::QueryStateProxy::getQueryState(), AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_array(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_geometry(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), TableDescriptor::isView, TableDescriptor::nShards, num_rows, Fragmenter_Namespace::InsertData::numRows, Parser::InsertIntoTableAsSelectStmt::LocalConnector::query(), run_benchmark_import::res, run_benchmark_import::result, AggregatedResult::rs, TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, and AggregatedResult::targets_meta.

2403  {
2404  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
2405  LocalConnector local_connector;
2406 
2407  bool populate_table = false;
2408 
2409  if (leafs_connector_) {
2410  populate_table = true;
2411  } else {
2412  leafs_connector_ = &local_connector;
2413  if (!g_cluster) {
2414  populate_table = true;
2415  }
2416  }
2417 
2418  auto& catalog = session->getCatalog();
2419  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
2420  std::vector<const ColumnDescriptor*> target_column_descriptors;
2421  if (column_list_.empty()) {
2422  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2423  target_column_descriptors = {std::begin(list), std::end(list)};
2424 
2425  } else {
2426  for (auto& c : column_list_) {
2427  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2428  if (cd == nullptr) {
2429  throw std::runtime_error("Column " + *c + " does not exist.");
2430  }
2431  target_column_descriptors.push_back(cd);
2432  }
2433  }
2434 
2435  return target_column_descriptors;
2436  };
2437 
2438  if (validate_table) {
2439  // check access privileges
2440  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
2441 
2442  if (td == nullptr) {
2443  throw std::runtime_error("Table " + table_name_ + " does not exist.");
2444  }
2445  if (td->isView) {
2446  throw std::runtime_error("Insert to views is not supported yet.");
2447  }
2448 
2449  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
2451  table_name_)) {
2452  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
2453  }
2454 
2455  // only validate the select query so we get the target types
2456  // correctly, but do not populate the result set
2457  auto result = local_connector.query(query_state_proxy, select_query_, true);
2458  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
2459 
2460  std::vector<const ColumnDescriptor*> target_column_descriptors =
2461  get_target_column_descriptors(td);
2462  if (catalog.getAllColumnMetadataForTable(td->tableId, false, false, false).size() !=
2463  target_column_descriptors.size()) {
2464  throw std::runtime_error("Insert into a subset of columns is not supported yet.");
2465  }
2466 
2467  if (source_column_descriptors.size() != target_column_descriptors.size()) {
2468  throw std::runtime_error("The number of source and target columns does not match.");
2469  }
2470 
2471  for (int i = 0; i < source_column_descriptors.size(); i++) {
2472  const ColumnDescriptor* source_cd =
2473  &(*std::next(source_column_descriptors.begin(), i));
2474  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
2475 
2476  if ((!source_cd->columnType.is_string() && !target_cd->columnType.is_string()) &&
2477  (source_cd->columnType.get_type() != target_cd->columnType.get_type())) {
2478  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2479  source_cd->columnType.get_type_name() +
2480  "' and target '" + target_cd->columnName + " " +
2481  target_cd->columnType.get_type_name() +
2482  "' column types do not match.");
2483  }
2484  if (source_cd->columnType.is_array()) {
2485  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
2486  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2487  source_cd->columnType.get_type_name() +
2488  "' and target '" + target_cd->columnName + " " +
2489  target_cd->columnType.get_type_name() +
2490  "' array column element types do not match.");
2491  }
2492  }
2493 
2494  if (source_cd->columnType.is_decimal() ||
2495  source_cd->columnType.get_elem_type().is_decimal()) {
2496  SQLTypeInfo sourceType = source_cd->columnType;
2497  SQLTypeInfo targetType = target_cd->columnType;
2498 
2499  if (source_cd->columnType.is_array()) {
2500  sourceType = source_cd->columnType.get_elem_type();
2501  targetType = target_cd->columnType.get_elem_type();
2502  }
2503 
2504  if ((sourceType.get_dimension() != targetType.get_dimension()) ||
2505  (sourceType.get_scale() != targetType.get_scale())) {
2506  throw std::runtime_error(
2507  "Source '" + source_cd->columnName + " " +
2508  source_cd->columnType.get_type_name() + "' and target '" +
2509  target_cd->columnName + " " + target_cd->columnType.get_type_name() +
2510  "' decimal columns dimension and or scale do not match.");
2511  }
2512  }
2513 
2514  if (source_cd->columnType.is_string()) {
2515  if (source_cd->columnType.get_compression() !=
2516  target_cd->columnType.get_compression()) {
2517  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2518  source_cd->columnType.get_type_name() +
2519  "' and target '" + target_cd->columnName + " " +
2520  target_cd->columnType.get_type_name() +
2521  "' columns string encodings do not match.");
2522  }
2523  }
2524 
2525  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
2526  if (source_cd->columnType.get_dimension() !=
2527  target_cd->columnType.get_dimension()) {
2528  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2529  source_cd->columnType.get_type_name() +
2530  "' and target '" + target_cd->columnName + " " +
2531  target_cd->columnType.get_type_name() +
2532  "' timestamp column precisions do not match.");
2533  }
2534  }
2535 
2536  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
2537  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
2538  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2539  source_cd->columnType.get_type_name() +
2540  "' and target '" + target_cd->columnName + " " +
2541  target_cd->columnType.get_type_name() +
2542  "' column encoding sizes do not match.");
2543  }
2544  }
2545  }
2546 
2547  if (!populate_table) {
2548  return;
2549  }
2550 
2551  const TableDescriptor* created_td = catalog.getMetadataForTable(table_name_);
2553  AggregatedResult res = leafs_connector_->query(query_state_proxy, select_query_);
2554  auto target_column_descriptors = get_target_column_descriptors(created_td);
2555  auto result_rows = res.rs;
2556  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
2557  const auto num_rows = result_rows->rowCount();
2558 
2559  if (0 == num_rows) {
2560  return;
2561  }
2562 
2563  size_t leaf_count = leafs_connector_->leafCount();
2564 
2565  size_t max_number_of_rows_per_package = std::min(num_rows / leaf_count, 64UL * 1024UL);
2566 
2567  size_t start_row = 0;
2568  size_t num_rows_to_process = std::min(num_rows, max_number_of_rows_per_package);
2569 
2570  // ensure that at least one row is being processed
2571  num_rows_to_process = std::max(num_rows_to_process, 1UL);
2572 
2573  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
2574 
2576 
2577  const int num_worker_threads = std::thread::hardware_concurrency();
2578 
2579  std::vector<size_t> thread_start_idx(num_worker_threads),
2580  thread_end_idx(num_worker_threads);
2581  bool can_go_parallel = !result_rows->isTruncated() && num_rows_to_process > 20000;
2582 
2583  std::atomic<size_t> row_idx{0};
2584 
2585  auto convert_function = [&result_rows,
2586  &value_converters,
2587  &row_idx,
2588  &num_rows_to_process,
2589  &thread_start_idx,
2590  &thread_end_idx](const int thread_id) {
2591  const int num_cols = value_converters.size();
2592  const size_t start = thread_start_idx[thread_id];
2593  const size_t end = thread_end_idx[thread_id];
2594  size_t idx = 0;
2595  for (idx = start; idx < end; ++idx) {
2596  const auto result_row = result_rows->getRowAtNoTranslations(idx);
2597  if (!result_row.empty()) {
2598  size_t target_row = row_idx.fetch_add(1);
2599 
2600  if (target_row >= num_rows_to_process) {
2601  break;
2602  }
2603 
2604  for (unsigned int col = 0; col < num_cols; col++) {
2605  const auto& mapd_variant = result_row[col];
2606  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
2607  }
2608  }
2609  }
2610 
2611  thread_start_idx[thread_id] = idx;
2612  };
2613 
2614  auto single_threaded_convert_function = [&result_rows,
2615  &value_converters,
2616  &row_idx,
2617  &num_rows_to_process,
2618  &thread_start_idx,
2619  &thread_end_idx](const int thread_id) {
2620  const int num_cols = value_converters.size();
2621  const size_t start = thread_start_idx[thread_id];
2622  const size_t end = thread_end_idx[thread_id];
2623  size_t idx = 0;
2624  for (idx = start; idx < end; ++idx) {
2625  size_t target_row = row_idx.fetch_add(1);
2626 
2627  if (target_row >= num_rows_to_process) {
2628  break;
2629  }
2630  const auto result_row = result_rows->getNextRow(false, false);
2631  CHECK(!result_row.empty());
2632  for (unsigned int col = 0; col < num_cols; col++) {
2633  const auto& mapd_variant = result_row[col];
2634  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
2635  }
2636  }
2637 
2638  thread_start_idx[thread_id] = idx;
2639  };
2640 
2641  if (can_go_parallel) {
2642  const size_t entryCount = result_rows->entryCount();
2643  for (size_t i = 0,
2644  start_entry = 0,
2645  stride = (entryCount + num_worker_threads - 1) / num_worker_threads;
2646  i < num_worker_threads && start_entry < entryCount;
2647  ++i, start_entry += stride) {
2648  const auto end_entry = std::min(start_entry + stride, entryCount);
2649  thread_start_idx[i] = start_entry;
2650  thread_end_idx[i] = end_entry;
2651  }
2652 
2653  } else {
2654  thread_start_idx[0] = 0;
2655  thread_end_idx[0] = result_rows->entryCount();
2656  }
2657 
2658  std::shared_ptr<Executor> executor;
2659 
2661  executor = Executor::getExecutor(catalog.getCurrentDB().dbId);
2662  }
2663 
2664  while (start_row < num_rows) {
2665  try {
2666  value_converters.clear();
2667  row_idx = 0;
2668  int colNum = 0;
2669  for (const auto targetDescriptor : target_column_descriptors) {
2670  auto sourceDataMetaInfo = res.targets_meta[colNum++];
2671 
2673  num_rows_to_process,
2674  catalog,
2675  sourceDataMetaInfo,
2676  targetDescriptor,
2677  targetDescriptor->columnType,
2678  !targetDescriptor->columnType.get_notnull(),
2679  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
2681  ? executor->getStringDictionaryProxy(
2682  sourceDataMetaInfo.get_type_info().get_comp_param(),
2683  result_rows->getRowSetMemOwner(),
2684  true)
2685  : nullptr};
2686  auto converter = factory.create(param);
2687  value_converters.push_back(std::move(converter));
2688  }
2689 
2690  if (can_go_parallel) {
2691  std::vector<std::future<void>> worker_threads;
2692  for (int i = 0; i < num_worker_threads; ++i) {
2693  worker_threads.push_back(std::async(std::launch::async, convert_function, i));
2694  }
2695 
2696  for (auto& child : worker_threads) {
2697  child.wait();
2698  }
2699  for (auto& child : worker_threads) {
2700  child.get();
2701  }
2702 
2703  } else {
2704  single_threaded_convert_function(0);
2705  }
2706 
2707  // finalize the insert data
2708  {
2709  auto finalizer_func =
2710  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
2711  targetValueConverter->finalizeDataBlocksForInsertData();
2712  };
2713  std::vector<std::future<void>> worker_threads;
2714  for (auto& converterPtr : value_converters) {
2715  worker_threads.push_back(
2716  std::async(std::launch::async, finalizer_func, converterPtr.get()));
2717  }
2718 
2719  for (auto& child : worker_threads) {
2720  child.wait();
2721  }
2722  for (auto& child : worker_threads) {
2723  child.get();
2724  }
2725  }
2726 
2728  insert_data.databaseId = catalog.getCurrentDB().dbId;
2729  CHECK(created_td);
2730  insert_data.tableId = created_td->tableId;
2731  insert_data.numRows = num_rows_to_process;
2732 
2733  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
2734  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
2735  }
2736 
2737  insertDataLoader.insertData(*session, insert_data);
2738  } catch (...) {
2739  try {
2740  if (created_td->nShards) {
2741  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2742  for (const auto ptd : shard_tables) {
2743  leafs_connector_->rollback(*session, ptd->tableId);
2744  }
2745  }
2746  leafs_connector_->rollback(*session, created_td->tableId);
2747  } catch (...) {
2748  // eat it
2749  }
2750  throw;
2751  }
2752  start_row += num_rows_to_process;
2753  num_rows_to_process = std::min(num_rows - start_row, max_number_of_rows_per_package);
2754  }
2755 
2756  if (!is_temporary) {
2757  if (created_td->nShards) {
2758  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2759  for (const auto ptd : shard_tables) {
2760  leafs_connector_->checkpoint(*session, ptd->tableId);
2761  }
2762  }
2763  leafs_connector_->checkpoint(*session, created_td->tableId);
2764  }
2765 }
const int8_t const int64_t * num_rows
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
const std::vector< TargetMetaInfo > targets_meta
bool g_cluster
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:127
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:154
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
std::string get_type_name() const
Definition: sqltypes.h:429
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
bool is_array() const
Definition: sqltypes.h:485
QueryState & getQueryState()
Definition: QueryState.h:172
virtual AggregatedResult query(QueryStateProxy, std::string &sql_query_string)=0
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
specifies the content in-memory of a row in the column metadata table
bool is_geometry() const
Definition: sqltypes.h:489
bool g_enable_experimental_string_functions
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1060
DistributedConnector * leafs_connector_
Definition: ParserNode.h:1057
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
bool is_string() const
Definition: sqltypes.h:477
std::shared_ptr< ResultSet > rs
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:659
SQLTypeInfo columnType
specifies the content in-memory of a row in the table metadata table
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
bool is_decimal() const
Definition: sqltypes.h:480
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:77
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)

+ Here is the call graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1060 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt().

DistributedConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1057 of file ParserNode.h.

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected

Definition at line 1062 of file ParserNode.h.

Referenced by get_select_query().

std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

Definition at line 1061 of file ParserNode.h.

Referenced by get_table().


The documentation for this class was generated from the following files: