OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Classes

struct  DistributedConnector
 
struct  LocalConnector
 

Public Member Functions

 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, bool is_temporary, bool validate_table)
 
void execute (const Catalog_Namespace::SessionInfo &session) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

DistributedConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1007 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1010 of file ParserNode.h.

References column_list_.

1013  : table_name_(*table_name), select_query_(*select_query) {
1014  if (c) {
1015  for (auto e : *c) {
1016  column_list_.emplace_back(e);
1017  }
1018  delete c;
1019  }
1020 
1021  delete table_name;
1022  delete select_query;
1023  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1060

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 2768 of file ParserNode.cpp.

References query_state::QueryState::create(), and STDLOG.

2768  {
2769  auto session_copy = session;
2770  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
2771  &session_copy, boost::null_deleter());
2772  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
2773  auto stdlog = STDLOG(query_state);
2774  populateData(query_state->createQueryStateProxy(), false, true);
2775 }
void populateData(QueryStateProxy, bool is_temporary, bool validate_table)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:140
#define STDLOG(...)
Definition: QueryState.h:225

+ Here is the call graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1030 of file ParserNode.h.

References select_query_.

1030 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1028 of file ParserNode.h.

References table_name_.

1028 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
bool  is_temporary,
bool  validate_table 
)

Definition at line 2402 of file ParserNode.cpp.

References CHECK(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, g_cluster, g_enable_experimental_string_functions, ResultSet::GeoTargetValue, SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_elem_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type_name(), Parser::InsertIntoTableAsSelectStmt::LocalConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), query_state::QueryStateProxy::getQueryState(), AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_array(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), TableDescriptor::isView, TableDescriptor::nShards, num_rows, Fragmenter_Namespace::InsertData::numRows, Parser::InsertIntoTableAsSelectStmt::LocalConnector::query(), run_benchmark_import::res, run_benchmark_import::result, AggregatedResult::rs, TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, and AggregatedResult::targets_meta.

2404  {
2405  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
2406  LocalConnector local_connector;
2407 
2408  bool populate_table = false;
2409 
2410  if (leafs_connector_) {
2411  populate_table = true;
2412  } else {
2413  leafs_connector_ = &local_connector;
2414  if (!g_cluster) {
2415  populate_table = true;
2416  }
2417  }
2418 
2419  auto& catalog = session->getCatalog();
2420  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
2421  std::vector<const ColumnDescriptor*> target_column_descriptors;
2422  if (column_list_.empty()) {
2423  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2424  target_column_descriptors = {std::begin(list), std::end(list)};
2425 
2426  } else {
2427  for (auto& c : column_list_) {
2428  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2429  if (cd == nullptr) {
2430  throw std::runtime_error("Column " + *c + " does not exist.");
2431  }
2432  target_column_descriptors.push_back(cd);
2433  }
2434  }
2435 
2436  return target_column_descriptors;
2437  };
2438 
2439  if (validate_table) {
2440  // check access privileges
2441  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
2442 
2443  if (td == nullptr) {
2444  throw std::runtime_error("Table " + table_name_ + " does not exist.");
2445  }
2446  if (td->isView) {
2447  throw std::runtime_error("Insert to views is not supported yet.");
2448  }
2449 
2450  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
2452  table_name_)) {
2453  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
2454  }
2455 
2456  // only validate the select query so we get the target types
2457  // correctly, but do not populate the result set
2458  auto result = local_connector.query(query_state_proxy, select_query_, true);
2459  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
2460 
2461  std::vector<const ColumnDescriptor*> target_column_descriptors =
2462  get_target_column_descriptors(td);
2463  if (catalog.getAllColumnMetadataForTable(td->tableId, false, false, false).size() !=
2464  target_column_descriptors.size()) {
2465  throw std::runtime_error("Insert into a subset of columns is not supported yet.");
2466  }
2467 
2468  if (source_column_descriptors.size() != target_column_descriptors.size()) {
2469  throw std::runtime_error("The number of source and target columns does not match.");
2470  }
2471 
2472  for (int i = 0; i < source_column_descriptors.size(); i++) {
2473  const ColumnDescriptor* source_cd =
2474  &(*std::next(source_column_descriptors.begin(), i));
2475  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
2476 
2477  if ((!source_cd->columnType.is_string() && !target_cd->columnType.is_string()) &&
2478  (source_cd->columnType.get_type() != target_cd->columnType.get_type())) {
2479  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2480  source_cd->columnType.get_type_name() +
2481  "' and target '" + target_cd->columnName + " " +
2482  target_cd->columnType.get_type_name() +
2483  "' column types do not match.");
2484  }
2485  if (source_cd->columnType.is_array()) {
2486  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
2487  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2488  source_cd->columnType.get_type_name() +
2489  "' and target '" + target_cd->columnName + " " +
2490  target_cd->columnType.get_type_name() +
2491  "' array column element types do not match.");
2492  }
2493  }
2494 
2495  if (source_cd->columnType.is_decimal() ||
2496  source_cd->columnType.get_elem_type().is_decimal()) {
2497  SQLTypeInfo sourceType = source_cd->columnType;
2498  SQLTypeInfo targetType = target_cd->columnType;
2499 
2500  if (source_cd->columnType.is_array()) {
2501  sourceType = source_cd->columnType.get_elem_type();
2502  targetType = target_cd->columnType.get_elem_type();
2503  }
2504 
2505  if ((sourceType.get_dimension() != targetType.get_dimension()) ||
2506  (sourceType.get_scale() != targetType.get_scale())) {
2507  throw std::runtime_error(
2508  "Source '" + source_cd->columnName + " " +
2509  source_cd->columnType.get_type_name() + "' and target '" +
2510  target_cd->columnName + " " + target_cd->columnType.get_type_name() +
2511  "' decimal columns dimension and or scale do not match.");
2512  }
2513  }
2514 
2515  if (source_cd->columnType.is_string()) {
2516  if (source_cd->columnType.get_compression() !=
2517  target_cd->columnType.get_compression()) {
2518  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2519  source_cd->columnType.get_type_name() +
2520  "' and target '" + target_cd->columnName + " " +
2521  target_cd->columnType.get_type_name() +
2522  "' columns string encodings do not match.");
2523  }
2524  }
2525 
2526  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
2527  if (source_cd->columnType.get_dimension() !=
2528  target_cd->columnType.get_dimension()) {
2529  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2530  source_cd->columnType.get_type_name() +
2531  "' and target '" + target_cd->columnName + " " +
2532  target_cd->columnType.get_type_name() +
2533  "' timestamp column precisions do not match.");
2534  }
2535  }
2536 
2537  if (!source_cd->columnType.is_string() &&
2538  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
2539  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2540  source_cd->columnType.get_type_name() +
2541  "' and target '" + target_cd->columnName + " " +
2542  target_cd->columnType.get_type_name() +
2543  "' column encoding sizes do not match.");
2544  }
2545  }
2546  }
2547 
2548  if (!populate_table) {
2549  return;
2550  }
2551 
2552  const TableDescriptor* created_td = catalog.getMetadataForTable(table_name_);
2554  AggregatedResult res = leafs_connector_->query(query_state_proxy, select_query_);
2555  auto target_column_descriptors = get_target_column_descriptors(created_td);
2556  auto result_rows = res.rs;
2557  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
2558  const auto num_rows = result_rows->rowCount();
2559 
2560  if (0 == num_rows) {
2561  return;
2562  }
2563 
2564  size_t leaf_count = leafs_connector_->leafCount();
2565 
2566  size_t max_number_of_rows_per_package = std::min(num_rows / leaf_count, 64UL * 1024UL);
2567 
2568  size_t start_row = 0;
2569  size_t num_rows_to_process = std::min(num_rows, max_number_of_rows_per_package);
2570 
2571  // ensure that at least one row is being processed
2572  num_rows_to_process = std::max(num_rows_to_process, 1UL);
2573 
2574  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
2575 
2577 
2578  const int num_worker_threads = std::thread::hardware_concurrency();
2579 
2580  std::vector<size_t> thread_start_idx(num_worker_threads),
2581  thread_end_idx(num_worker_threads);
2582  bool can_go_parallel = !result_rows->isTruncated() && num_rows_to_process > 20000;
2583 
2584  std::atomic<size_t> row_idx{0};
2585 
2586  auto convert_function = [&result_rows,
2587  &value_converters,
2588  &row_idx,
2589  &num_rows_to_process,
2590  &thread_start_idx,
2591  &thread_end_idx](const int thread_id) {
2592  const int num_cols = value_converters.size();
2593  const size_t start = thread_start_idx[thread_id];
2594  const size_t end = thread_end_idx[thread_id];
2595  size_t idx = 0;
2596  for (idx = start; idx < end; ++idx) {
2597  const auto result_row = result_rows->getRowAtNoTranslations(idx);
2598  if (!result_row.empty()) {
2599  size_t target_row = row_idx.fetch_add(1);
2600 
2601  if (target_row >= num_rows_to_process) {
2602  break;
2603  }
2604 
2605  for (unsigned int col = 0; col < num_cols; col++) {
2606  const auto& mapd_variant = result_row[col];
2607  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
2608  }
2609  }
2610  }
2611 
2612  thread_start_idx[thread_id] = idx;
2613  };
2614 
2615  auto single_threaded_convert_function = [&result_rows,
2616  &value_converters,
2617  &row_idx,
2618  &num_rows_to_process,
2619  &thread_start_idx,
2620  &thread_end_idx](const int thread_id) {
2621  const int num_cols = value_converters.size();
2622  const size_t start = thread_start_idx[thread_id];
2623  const size_t end = thread_end_idx[thread_id];
2624  size_t idx = 0;
2625  for (idx = start; idx < end; ++idx) {
2626  size_t target_row = row_idx.fetch_add(1);
2627 
2628  if (target_row >= num_rows_to_process) {
2629  break;
2630  }
2631  const auto result_row = result_rows->getNextRow(false, false);
2632  CHECK(!result_row.empty());
2633  for (unsigned int col = 0; col < num_cols; col++) {
2634  const auto& mapd_variant = result_row[col];
2635  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
2636  }
2637  }
2638 
2639  thread_start_idx[thread_id] = idx;
2640  };
2641 
2642  if (can_go_parallel) {
2643  const size_t entryCount = result_rows->entryCount();
2644  for (size_t i = 0,
2645  start_entry = 0,
2646  stride = (entryCount + num_worker_threads - 1) / num_worker_threads;
2647  i < num_worker_threads && start_entry < entryCount;
2648  ++i, start_entry += stride) {
2649  const auto end_entry = std::min(start_entry + stride, entryCount);
2650  thread_start_idx[i] = start_entry;
2651  thread_end_idx[i] = end_entry;
2652  }
2653 
2654  } else {
2655  thread_start_idx[0] = 0;
2656  thread_end_idx[0] = result_rows->entryCount();
2657  }
2658 
2659  std::shared_ptr<Executor> executor;
2660 
2662  executor = Executor::getExecutor(catalog.getCurrentDB().dbId);
2663  }
2664 
2665  while (start_row < num_rows) {
2666  try {
2667  value_converters.clear();
2668  row_idx = 0;
2669  int colNum = 0;
2670  for (const auto targetDescriptor : target_column_descriptors) {
2671  auto sourceDataMetaInfo = res.targets_meta[colNum++];
2672 
2674  num_rows_to_process,
2675  catalog,
2676  sourceDataMetaInfo,
2677  targetDescriptor,
2678  targetDescriptor->columnType,
2679  !targetDescriptor->columnType.get_notnull(),
2680  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
2682  ? executor->getStringDictionaryProxy(
2683  sourceDataMetaInfo.get_type_info().get_comp_param(),
2684  result_rows->getRowSetMemOwner(),
2685  true)
2686  : nullptr};
2687  auto converter = factory.create(param);
2688  value_converters.push_back(std::move(converter));
2689  }
2690 
2691  if (can_go_parallel) {
2692  std::vector<std::future<void>> worker_threads;
2693  for (int i = 0; i < num_worker_threads; ++i) {
2694  worker_threads.push_back(std::async(std::launch::async, convert_function, i));
2695  }
2696 
2697  for (auto& child : worker_threads) {
2698  child.wait();
2699  }
2700  for (auto& child : worker_threads) {
2701  child.get();
2702  }
2703 
2704  } else {
2705  single_threaded_convert_function(0);
2706  }
2707 
2708  // finalize the insert data
2709  {
2710  auto finalizer_func =
2711  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
2712  targetValueConverter->finalizeDataBlocksForInsertData();
2713  };
2714  std::vector<std::future<void>> worker_threads;
2715  for (auto& converterPtr : value_converters) {
2716  worker_threads.push_back(
2717  std::async(std::launch::async, finalizer_func, converterPtr.get()));
2718  }
2719 
2720  for (auto& child : worker_threads) {
2721  child.wait();
2722  }
2723  for (auto& child : worker_threads) {
2724  child.get();
2725  }
2726  }
2727 
2729  insert_data.databaseId = catalog.getCurrentDB().dbId;
2730  CHECK(created_td);
2731  insert_data.tableId = created_td->tableId;
2732  insert_data.numRows = num_rows_to_process;
2733 
2734  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
2735  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
2736  }
2737 
2738  insertDataLoader.insertData(*session, insert_data);
2739  } catch (...) {
2740  try {
2741  if (created_td->nShards) {
2742  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2743  for (const auto ptd : shard_tables) {
2744  leafs_connector_->rollback(*session, ptd->tableId);
2745  }
2746  }
2747  leafs_connector_->rollback(*session, created_td->tableId);
2748  } catch (...) {
2749  // eat it
2750  }
2751  throw;
2752  }
2753  start_row += num_rows_to_process;
2754  num_rows_to_process = std::min(num_rows - start_row, max_number_of_rows_per_package);
2755  }
2756 
2757  if (!is_temporary) {
2758  if (created_td->nShards) {
2759  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2760  for (const auto ptd : shard_tables) {
2761  leafs_connector_->checkpoint(*session, ptd->tableId);
2762  }
2763  }
2764  leafs_connector_->checkpoint(*session, created_td->tableId);
2765  }
2766 }
const int8_t const int64_t * num_rows
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(),::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:127
const std::vector< TargetMetaInfo > targets_meta
bool g_cluster
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:154
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
std::string get_type_name() const
Definition: sqltypes.h:429
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
bool is_array() const
Definition: sqltypes.h:485
QueryState & getQueryState()
Definition: QueryState.h:172
virtual AggregatedResult query(QueryStateProxy, std::string &sql_query_string)=0
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
specifies the content in-memory of a row in the column metadata table
bool g_enable_experimental_string_functions
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1060
DistributedConnector * leafs_connector_
Definition: ParserNode.h:1057
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
bool is_string() const
Definition: sqltypes.h:477
std::shared_ptr< ResultSet > rs
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:659
SQLTypeInfo columnType
specifies the content in-memory of a row in the table metadata table
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
bool is_decimal() const
Definition: sqltypes.h:480
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:75
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)

+ Here is the call graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1060 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt().

DistributedConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1057 of file ParserNode.h.

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected

Definition at line 1062 of file ParserNode.h.

Referenced by get_select_query().

std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

Definition at line 1061 of file ParserNode.h.

Referenced by get_table().


The documentation for this class was generated from the following files: