OmniSciDB  c07336695a
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Classes

struct  DistributedConnector
 
struct  LocalConnector
 

Public Member Functions

 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string *> *c)
 
void populateData (const Catalog_Namespace::SessionInfo &session, bool is_temporary, bool validate_table)
 
void execute (const Catalog_Namespace::SessionInfo &session) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

DistributedConnectorleafs_connector_ = nullptr
 

Protected Attributes

std::vector< std::unique_ptr< std::string > > column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 992 of file ParserNode.h.

Constructor & Destructor Documentation

◆ InsertIntoTableAsSelectStmt()

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string *> *  c 
)
inline

Definition at line 995 of file ParserNode.h.

References anonymous_namespace{ExecuteTest.cpp}::c(), and session.

998  : table_name_(*table_name), select_query_(*select_query) {
999  if (c) {
1000  for (auto e : *c) {
1001  column_list_.emplace_back(e);
1002  }
1003  delete c;
1004  }
1005 
1006  delete table_name;
1007  delete select_query;
1008  }
void c(const std::string &query_string, const ExecutorDeviceType device_type)
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1054
+ Here is the call graph for this function:

Member Function Documentation

◆ execute()

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session)
inlineoverridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 1013 of file ParserNode.h.

1013  {
1014  populateData(session, false, true);
1015  }
void populateData(const Catalog_Namespace::SessionInfo &session, bool is_temporary, bool validate_table)

◆ get_select_query()

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1019 of file ParserNode.h.

1019 { return select_query_; }

◆ get_table()

std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1017 of file ParserNode.h.

1017 { return table_name_; }

◆ populateData()

void Parser::InsertIntoTableAsSelectStmt::populateData ( const Catalog_Namespace::SessionInfo session,
bool  is_temporary,
bool  validate_table 
)

Definition at line 2364 of file ParserNode.cpp.

References anonymous_namespace{ExecuteTest.cpp}::c(), CHECK, Catalog_Namespace::SessionInfo::checkDBAccessPrivileges(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, g_cluster, ResultSet::GeoTargetValue, SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_elem_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type_name(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), Catalog_Namespace::SessionInfo::getCatalog(), Parser::InsertIntoTableAsSelectStmt::LocalConnector::getColumnDescriptors(), AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_array(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), TableDescriptor::isView, TableDescriptor::nShards, num_rows, Fragmenter_Namespace::InsertData::numRows, Parser::InsertIntoTableAsSelectStmt::LocalConnector::query(), run-benchmark-import::res, run-benchmark-import::result, AggregatedResult::rs, TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, and AggregatedResult::targets_meta.

2367  {
2368  LocalConnector local_connector;
2369 
2370  bool populate_table = false;
2371 
2372  if (leafs_connector_) {
2373  populate_table = true;
2374  } else {
2375  leafs_connector_ = &local_connector;
2376  if (!g_cluster) {
2377  populate_table = true;
2378  }
2379  }
2380 
2381  auto& catalog = session.getCatalog();
2382  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
2383  std::vector<const ColumnDescriptor*> target_column_descriptors;
2384  if (column_list_.empty()) {
2385  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2386  target_column_descriptors = {std::begin(list), std::end(list)};
2387 
2388  } else {
2389  for (auto& c : column_list_) {
2390  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2391  if (cd == nullptr) {
2392  throw std::runtime_error("Column " + *c + " does not exist.");
2393  }
2394  target_column_descriptors.push_back(cd);
2395  }
2396  }
2397 
2398  return target_column_descriptors;
2399  };
2400 
2401  if (validate_table) {
2402  // check access privileges
2403  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
2404 
2405  if (td == nullptr) {
2406  throw std::runtime_error("Table " + table_name_ + " does not exist.");
2407  }
2408  if (td->isView) {
2409  throw std::runtime_error("Insert to views is not supported yet.");
2410  }
2411 
2414  table_name_)) {
2415  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
2416  }
2417 
2418  // only validate the select query so we get the target types
2419  // correctly, but do not populate the result set
2420  auto result = local_connector.query(session, select_query_, true);
2421  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
2422 
2423  std::vector<const ColumnDescriptor*> target_column_descriptors =
2424  get_target_column_descriptors(td);
2425  if (catalog.getAllColumnMetadataForTable(td->tableId, false, false, false).size() !=
2426  target_column_descriptors.size()) {
2427  throw std::runtime_error("Insert into a subset of columns is not supported yet.");
2428  }
2429 
2430  if (source_column_descriptors.size() != target_column_descriptors.size()) {
2431  throw std::runtime_error("The number of source and target columns does not match.");
2432  }
2433 
2434  for (int i = 0; i < source_column_descriptors.size(); i++) {
2435  const ColumnDescriptor* source_cd =
2436  &(*std::next(source_column_descriptors.begin(), i));
2437  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
2438 
2439  if ((!source_cd->columnType.is_string() && !target_cd->columnType.is_string()) &&
2440  (source_cd->columnType.get_type() != target_cd->columnType.get_type())) {
2441  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2442  source_cd->columnType.get_type_name() +
2443  "' and target '" + target_cd->columnName + " " +
2444  target_cd->columnType.get_type_name() +
2445  "' column types do not match.");
2446  }
2447  if (source_cd->columnType.is_array()) {
2448  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
2449  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2450  source_cd->columnType.get_type_name() +
2451  "' and target '" + target_cd->columnName + " " +
2452  target_cd->columnType.get_type_name() +
2453  "' array column element types do not match.");
2454  }
2455  }
2456 
2457  if (source_cd->columnType.is_decimal() ||
2458  source_cd->columnType.get_elem_type().is_decimal()) {
2459  SQLTypeInfo sourceType = source_cd->columnType;
2460  SQLTypeInfo targetType = target_cd->columnType;
2461 
2462  if (source_cd->columnType.is_array()) {
2463  sourceType = source_cd->columnType.get_elem_type();
2464  targetType = target_cd->columnType.get_elem_type();
2465  }
2466 
2467  if ((sourceType.get_dimension() != targetType.get_dimension()) ||
2468  (sourceType.get_scale() != targetType.get_scale())) {
2469  throw std::runtime_error(
2470  "Source '" + source_cd->columnName + " " +
2471  source_cd->columnType.get_type_name() + "' and target '" +
2472  target_cd->columnName + " " + target_cd->columnType.get_type_name() +
2473  "' decimal columns dimension and or scale do not match.");
2474  }
2475  }
2476 
2477  if (source_cd->columnType.is_string()) {
2478  if (source_cd->columnType.get_compression() !=
2479  target_cd->columnType.get_compression()) {
2480  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2481  source_cd->columnType.get_type_name() +
2482  "' and target '" + target_cd->columnName + " " +
2483  target_cd->columnType.get_type_name() +
2484  "' columns string encodings do not match.");
2485  }
2486  }
2487 
2488  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
2489  if (source_cd->columnType.get_dimension() !=
2490  target_cd->columnType.get_dimension()) {
2491  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2492  source_cd->columnType.get_type_name() +
2493  "' and target '" + target_cd->columnName + " " +
2494  target_cd->columnType.get_type_name() +
2495  "' timestamp column precisions do not match.");
2496  }
2497  }
2498 
2499  if (!source_cd->columnType.is_string() &&
2500  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
2501  throw std::runtime_error("Source '" + source_cd->columnName + " " +
2502  source_cd->columnType.get_type_name() +
2503  "' and target '" + target_cd->columnName + " " +
2504  target_cd->columnType.get_type_name() +
2505  "' column encoding sizes do not match.");
2506  }
2507  }
2508  }
2509 
2510  if (!populate_table) {
2511  return;
2512  }
2513 
2514  const TableDescriptor* created_td = catalog.getMetadataForTable(table_name_);
2517  auto target_column_descriptors = get_target_column_descriptors(created_td);
2518  auto result_rows = res.rs;
2519  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
2520  const auto num_rows = result_rows->rowCount();
2521 
2522  if (0 == num_rows) {
2523  return;
2524  }
2525 
2526  size_t leaf_count = leafs_connector_->leafCount();
2527 
2528  size_t max_number_of_rows_per_package = std::min(num_rows / leaf_count, 64UL * 1024UL);
2529 
2530  size_t start_row = 0;
2531  size_t num_rows_to_process = std::min(num_rows, max_number_of_rows_per_package);
2532 
2533  // ensure that at least one row is being processed
2534  num_rows_to_process = std::max(num_rows_to_process, 1UL);
2535 
2536  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
2537 
2539 
2540  const int num_worker_threads = std::thread::hardware_concurrency();
2541 
2542  std::vector<size_t> thread_start_idx(num_worker_threads),
2543  thread_end_idx(num_worker_threads);
2544  bool can_go_parallel = !result_rows->isTruncated() && num_rows_to_process > 20000;
2545 
2546  std::atomic<size_t> row_idx{0};
2547 
2548  auto convert_function = [&result_rows,
2549  &value_converters,
2550  &row_idx,
2551  &num_rows_to_process,
2552  &thread_start_idx,
2553  &thread_end_idx](const int thread_id) {
2554  const int num_cols = value_converters.size();
2555  const size_t start = thread_start_idx[thread_id];
2556  const size_t end = thread_end_idx[thread_id];
2557  size_t idx = 0;
2558  for (idx = start; idx < end; ++idx) {
2559  const auto result_row = result_rows->getRowAtNoTranslations(idx);
2560  if (!result_row.empty()) {
2561  size_t target_row = row_idx.fetch_add(1);
2562 
2563  if (target_row >= num_rows_to_process) {
2564  break;
2565  }
2566 
2567  for (unsigned int col = 0; col < num_cols; col++) {
2568  const auto& mapd_variant = result_row[col];
2569  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
2570  }
2571  }
2572  }
2573 
2574  thread_start_idx[thread_id] = idx;
2575  };
2576 
2577  if (can_go_parallel) {
2578  const size_t entryCount = result_rows->entryCount();
2579  for (size_t i = 0,
2580  start_entry = 0,
2581  stride = (entryCount + num_worker_threads - 1) / num_worker_threads;
2582  i < num_worker_threads && start_entry < entryCount;
2583  ++i, start_entry += stride) {
2584  const auto end_entry = std::min(start_entry + stride, entryCount);
2585  thread_start_idx[i] = start_entry;
2586  thread_end_idx[i] = end_entry;
2587  }
2588 
2589  } else {
2590  thread_start_idx[0] = 0;
2591  thread_end_idx[0] = result_rows->entryCount();
2592  }
2593 
2594  while (start_row < num_rows) {
2595  try {
2596  value_converters.clear();
2597  row_idx = 0;
2598  int colNum = 0;
2599  for (const auto targetDescriptor : target_column_descriptors) {
2600  auto sourceDataMetaInfo = res.targets_meta[colNum++];
2601 
2603  num_rows_to_process,
2604  catalog,
2605  sourceDataMetaInfo,
2606  targetDescriptor,
2607  targetDescriptor->columnType,
2608  !targetDescriptor->columnType.get_notnull(),
2609  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy()};
2610  auto converter = factory.create(param);
2611  value_converters.push_back(std::move(converter));
2612  }
2613 
2614  if (can_go_parallel) {
2615  std::vector<std::future<void>> worker_threads;
2616  for (int i = 0; i < num_worker_threads; ++i) {
2617  worker_threads.push_back(std::async(std::launch::async, convert_function, i));
2618  }
2619 
2620  for (auto& child : worker_threads) {
2621  child.wait();
2622  }
2623  for (auto& child : worker_threads) {
2624  child.get();
2625  }
2626 
2627  } else {
2628  convert_function(0);
2629  }
2630 
2631  // finalize the insert data
2632  {
2633  auto finalizer_func =
2634  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
2635  targetValueConverter->finalizeDataBlocksForInsertData();
2636  };
2637  std::vector<std::future<void>> worker_threads;
2638  for (auto& converterPtr : value_converters) {
2639  worker_threads.push_back(
2640  std::async(std::launch::async, finalizer_func, converterPtr.get()));
2641  }
2642 
2643  for (auto& child : worker_threads) {
2644  child.wait();
2645  }
2646  for (auto& child : worker_threads) {
2647  child.get();
2648  }
2649  }
2650 
2652  insert_data.databaseId = catalog.getCurrentDB().dbId;
2653  CHECK(created_td);
2654  insert_data.tableId = created_td->tableId;
2655  insert_data.numRows = num_rows_to_process;
2656 
2657  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
2658  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
2659  }
2660 
2661  insertDataLoader.insertData(session, insert_data);
2662  } catch (...) {
2663  try {
2664  if (created_td->nShards) {
2665  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2666  for (const auto ptd : shard_tables) {
2667  leafs_connector_->rollback(session, ptd->tableId);
2668  }
2669  }
2670  leafs_connector_->rollback(session, created_td->tableId);
2671  } catch (...) {
2672  // eat it
2673  }
2674  throw;
2675  }
2676  start_row += num_rows_to_process;
2677  num_rows_to_process = std::min(num_rows - start_row, max_number_of_rows_per_package);
2678  }
2679 
2680  if (!is_temporary) {
2681  if (created_td->nShards) {
2682  const auto shard_tables = catalog.getPhysicalTablesDescriptors(created_td);
2683  for (const auto ptd : shard_tables) {
2684  leafs_connector_->checkpoint(session, ptd->tableId);
2685  }
2686  }
2687  leafs_connector_->checkpoint(session, created_td->tableId);
2688  }
2689 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:329
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:321
const int8_t const int64_t * num_rows
const std::vector< TargetMetaInfo > targets_meta
void c(const std::string &query_string, const ExecutorDeviceType device_type)
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:154
HOST DEVICE int get_scale() const
Definition: sqltypes.h:324
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:319
virtual void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:327
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
virtual AggregatedResult query(const Catalog_Namespace::SessionInfo &parent_session_info, std::string &sql_query_string)=0
std::string get_type_name() const
Definition: sqltypes.h:422
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
bool is_array() const
Definition: sqltypes.h:454
bool is_decimal() const
Definition: sqltypes.h:449
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:320
Catalog & getCatalog() const
Definition: SessionInfo.h:90
specifies the content in-memory of a row in the column metadata table
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:628
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1579
bool checkDBAccessPrivileges(const DBObjectType &permissionType, const AccessPrivileges &privs, const std::string &objectName="") const
Definition: SessionInfo.cpp:24
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1054
DistributedConnector * leafs_connector_
Definition: ParserNode.h:1051
std::shared_ptr< ResultSet > rs
#define CHECK(condition)
Definition: Logger.h:187
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
SQLTypeInfo columnType
specifies the content in-memory of a row in the table metadata table
virtual void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId)=0
std::string columnName
bool is_string() const
Definition: sqltypes.h:446
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
+ Here is the call graph for this function:

Member Data Documentation

◆ column_list_

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1054 of file ParserNode.h.

◆ leafs_connector_

DistributedConnector* Parser::InsertIntoTableAsSelectStmt::leafs_connector_ = nullptr

Definition at line 1051 of file ParserNode.h.

◆ select_query_

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected

Definition at line 1056 of file ParserNode.h.

◆ table_name_

std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

Definition at line 1055 of file ParserNode.h.


The documentation for this class was generated from the following files: