OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertDataLoader Class Reference

#include <InsertDataLoader.h>

+ Collaboration diagram for Fragmenter_Namespace::InsertDataLoader:

Classes

class  InsertConnector
 

Public Member Functions

 InsertDataLoader (InsertConnector &connector)
 
void insertData (const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
 
void insertChunks (const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
 
size_t getLeafCount () const
 

Private Member Functions

size_t moveToNextLeaf ()
 

Private Attributes

size_t leaf_count_
 
size_t current_leaf_index_
 
InsertConnectorconnector_
 
std::shared_mutex current_leaf_index_mutex_
 

Detailed Description

Definition at line 25 of file InsertDataLoader.h.

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertDataLoader::InsertDataLoader ( InsertConnector connector)
inline

Definition at line 46 of file InsertDataLoader.h.

47  : leaf_count_(connector.leafCount())
49  , connector_(connector) {}

Member Function Documentation

size_t Fragmenter_Namespace::InsertDataLoader::getLeafCount ( ) const
inline

Definition at line 57 of file InsertDataLoader.h.

References leaf_count_.

Referenced by RelAlgExecutor::executeSimpleInsert().

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertDataLoader::insertChunks ( const Catalog_Namespace::SessionInfo session_info,
const InsertChunks insert_chunks 
)

Definition at line 418 of file InsertDataLoader.cpp.

References threading_serial::async(), cat(), CHECK, Fragmenter_Namespace::compute_row_indices_of_shards(), connector_, Fragmenter_Namespace::copy_data_of_shard(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::InsertConnector::insertChunksToLeaf(), Fragmenter_Namespace::InsertDataLoader::InsertConnector::leafCount(), moveToNextLeaf(), and Fragmenter_Namespace::InsertChunks::table_id.

Referenced by anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers().

419  {
420  const auto& cat = session_info.getCatalog();
421  const auto* td = cat.getMetadataForTable(insert_chunks.table_id);
422 
423  CHECK(td);
424  if (td->nShards == 0) {
425  connector_.insertChunksToLeaf(session_info, moveToNextLeaf(), insert_chunks);
426  } else {
427  // we have a sharded target table, start spreading to physical tables
428  auto row_indices_of_shards =
430 
431  auto insert_shard_data =
432  [this, &session_info, &insert_chunks, &cat, &td, &row_indices_of_shards](
433  size_t shardId) {
434  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
435  auto stard_table_idx = shardId % td->nShards;
436  auto shard_leaf_idx = shardId / td->nShards;
437 
438  const auto& row_indices_of_shard = row_indices_of_shards[shardId];
439 
440  auto [buffers, shard_insert_chunks] = copy_data_of_shard(
441  cat, insert_chunks, stard_table_idx, row_indices_of_shard);
443  session_info, shard_leaf_idx, shard_insert_chunks);
444  };
445 
446  std::vector<std::future<void>> worker_threads;
447  for (size_t shard_id = 0; shard_id < row_indices_of_shards.size(); shard_id++) {
448  if (row_indices_of_shards[shard_id].size() > 0) {
449  worker_threads.push_back(
450  std::async(std::launch::async, insert_shard_data, shard_id));
451  }
452  }
453  for (auto& child : worker_threads) {
454  child.wait();
455  }
456  for (auto& child : worker_threads) {
457  child.get();
458  }
459  }
460 }
std::string cat(Ts &&...args)
std::pair< std::list< std::unique_ptr< foreign_storage::ForeignStorageBuffer > >, InsertChunks > copy_data_of_shard(const Catalog_Namespace::Catalog &cat, const InsertChunks &insert_chunks, int shardTableIndex, const std::vector< size_t > &rowIndices)
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::vector< size_t > > compute_row_indices_of_shards(size_t shard_count, size_t leaf_count, size_t row_count, SRC *src, bool duplicated_key_value)
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define CHECK(condition)
Definition: Logger.h:291
virtual void insertChunksToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, const Fragmenter_Namespace::InsertChunks &insert_chunks)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertDataLoader::insertData ( const Catalog_Namespace::SessionInfo session_info,
InsertData insert_data 
)

Definition at line 462 of file InsertDataLoader.cpp.

References threading_serial::async(), cat(), CHECK, Fragmenter_Namespace::computeRowIndicesOfShards(), connector_, Fragmenter_Namespace::copyDataOfShard(), Catalog_Namespace::SessionInfo::getCatalog(), Fragmenter_Namespace::InsertDataLoader::InsertConnector::insertDataToLeaf(), Fragmenter_Namespace::InsertDataLoader::InsertConnector::leafCount(), moveToNextLeaf(), Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::tableId.

Referenced by RelAlgExecutor::executeSimpleInsert(), and Parser::InsertIntoTableAsSelectStmt::populateData().

463  {
464  const auto& cat = session_info.getCatalog();
465  const auto* td = cat.getMetadataForTable(insert_data.tableId);
466 
467  CHECK(td);
468  if (td->nShards == 0) {
469  connector_.insertDataToLeaf(session_info, moveToNextLeaf(), insert_data);
470  } else {
471  // we have a sharded target table, start spreading to physical tables
472  auto rowIndicesOfShards =
474 
475  auto insertShardData =
476  [this, &session_info, &insert_data, &cat, &td, &rowIndicesOfShards](
477  size_t shardId) {
478  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
479  auto stardTableIdx = shardId % td->nShards;
480  auto shardLeafIdx = shardId / td->nShards;
481 
482  const auto& rowIndicesOfShard = rowIndicesOfShards[shardId];
483  ShardDataOwner shardDataOwner;
484 
485  InsertData shardData = copyDataOfShard(
486  cat, shardDataOwner, insert_data, stardTableIdx, rowIndicesOfShard);
487  CHECK(shardData.numRows > 0);
488  connector_.insertDataToLeaf(session_info, shardLeafIdx, shardData);
489  };
490 
491  std::vector<std::future<void>> worker_threads;
492  for (size_t shardId = 0; shardId < rowIndicesOfShards.size(); shardId++) {
493  if (rowIndicesOfShards[shardId].size() > 0) {
494  worker_threads.push_back(
495  std::async(std::launch::async, insertShardData, shardId));
496  }
497  }
498  for (auto& child : worker_threads) {
499  child.wait();
500  }
501  for (auto& child : worker_threads) {
502  child.get();
503  }
504  }
505 }
std::string cat(Ts &&...args)
std::vector< std::vector< size_t > > computeRowIndicesOfShards(const Catalog_Namespace::Catalog &cat, size_t leafCount, InsertData &insert_data)
virtual void insertDataToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, Fragmenter_Namespace::InsertData &insert_data)=0
future< Result > async(Fn &&fn, Args &&...args)
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define CHECK(condition)
Definition: Logger.h:291
InsertData copyDataOfShard(const Catalog_Namespace::Catalog &cat, ShardDataOwner &dataOwner, InsertData &insert_data, int shardTableIndex, const std::vector< size_t > &rowIndices)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t Fragmenter_Namespace::InsertDataLoader::moveToNextLeaf ( )
private

Move to the next available leaf index internally. Done under a lock to prevent contention.

Returns
the current leaf index (prior to moving to the next index)

Definition at line 408 of file InsertDataLoader.cpp.

References current_leaf_index_, current_leaf_index_mutex_, and leaf_count_.

Referenced by insertChunks(), and insertData().

408  {
409  std::unique_lock current_leaf_index_lock(current_leaf_index_mutex_);
410  size_t starting_leaf_index = current_leaf_index_;
414  }
415  return starting_leaf_index;
416 }
std::unique_lock< T > unique_lock

+ Here is the caller graph for this function:

Member Data Documentation

InsertConnector& Fragmenter_Namespace::InsertDataLoader::connector_
private

Definition at line 70 of file InsertDataLoader.h.

Referenced by insertChunks(), and insertData().

size_t Fragmenter_Namespace::InsertDataLoader::current_leaf_index_
private

Definition at line 69 of file InsertDataLoader.h.

Referenced by moveToNextLeaf().

std::shared_mutex Fragmenter_Namespace::InsertDataLoader::current_leaf_index_mutex_
private

Definition at line 71 of file InsertDataLoader.h.

Referenced by moveToNextLeaf().

size_t Fragmenter_Namespace::InsertDataLoader::leaf_count_
private

Definition at line 68 of file InsertDataLoader.h.

Referenced by getLeafCount(), and moveToNextLeaf().


The documentation for this class was generated from the following files: