OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer_NS::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Loader:
+ Collaboration diagram for Importer_NS::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog ()
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual int32_t getTableEpoch ()
 
virtual void setTableEpoch (const int32_t new_epoch)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 
void dropColumns (const std::vector< int > &columns)
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 511 of file Importer.h.

Member Typedef Documentation

using Importer_NS::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 559 of file Importer.h.

Constructor & Destructor Documentation

Importer_NS::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t 
)
inline

Definition at line 513 of file Importer.h.

References init().

514  : catalog_(c)
515  , table_desc_(t)
516  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true)) {
517  init();
518  }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:568
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1675
const TableDescriptor * table_desc_
Definition: Importer.h:567

+ Here is the call graph for this function:

virtual Importer_NS::Loader::~Loader ( )
inlinevirtual

Definition at line 520 of file Importer.h.

520 {}

Member Function Documentation

void Importer_NS::Loader::checkpoint ( )
virtual

Definition at line 4059 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpoint(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4059  {
4060  if (getTableDesc()->persistenceLevel ==
4061  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4062  getCatalog().checkpoint(getTableDesc()->tableId);
4063  }
4064 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:522
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:3357
const TableDescriptor * getTableDesc() const
Definition: Importer.h:523

+ Here is the call graph for this function:

void Importer_NS::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2481 of file Importer.cpp.

References catalog_, CHECK(), CHECK_GT, CHECK_LE, CHECK_LT, column_descs_, decimal_to_int_type(), Importer_NS::anonymous_namespace{Importer.cpp}::double_value_at(), Importer_NS::anonymous_namespace{Importer.cpp}::float_value_at(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), getReplicating(), Importer_NS::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, TableDescriptor::nShards, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, table_desc_, and run_benchmark_import::type.

Referenced by loadImpl().

2485  {
2486  all_shard_row_counts.resize(shard_count);
2487  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2488  all_shard_import_buffers.emplace_back();
2489  for (const auto& typed_import_buffer : import_buffers) {
2490  all_shard_import_buffers.back().emplace_back(
2491  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2492  typed_import_buffer->getStringDictionary()));
2493  }
2494  }
2496  int col_idx{0};
2497  const ColumnDescriptor* shard_col_desc{nullptr};
2498  for (const auto col_desc : column_descs_) {
2499  ++col_idx;
2500  if (col_idx == table_desc_->shardedColumnId) {
2501  shard_col_desc = col_desc;
2502  break;
2503  }
2504  }
2505  CHECK(shard_col_desc);
2506  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2507  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2508  const auto& shard_col_ti = shard_col_desc->columnType;
2509  CHECK(shard_col_ti.is_integer() ||
2510  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2511  shard_col_ti.is_time());
2512  if (shard_col_ti.is_string()) {
2513  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2514  CHECK(payloads_ptr);
2515  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2516  }
2517 
2518  // for each replicated (alter added) columns, number of rows in a shard is
2519  // inferred from that of the sharding column, not simply evenly distributed.
2520  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2521  // Here the loop count is overloaded. For normal imports, we loop thru all
2522  // input values (rows), so the loop count is the number of input rows.
2523  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2524  // all shards, so the loop count is the number of shards.
2525  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2526  for (size_t i = 0; i < loop_count; ++i) {
2527  const size_t shard =
2528  getReplicating()
2529  ? i
2530  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2531  auto& shard_output_buffers = all_shard_import_buffers[shard];
2532 
2533  // when replicate a column, populate 'rows' to all shards only once
2534  // and its value is fetch from the first and the single row
2535  const auto row_index = getReplicating() ? 0 : i;
2536 
2537  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2538  const auto& input_buffer = import_buffers[col_idx];
2539  const auto& col_ti = input_buffer->getTypeInfo();
2540  const auto type =
2541  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2542 
2543  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2544  // count. and, bypass non-replicated column.
2545  if (getReplicating()) {
2546  if (input_buffer->get_replicate_count() > 0) {
2547  shard_output_buffers[col_idx]->set_replicate_count(
2548  shard_tds[shard]->fragmenter->getNumRows());
2549  } else {
2550  continue;
2551  }
2552  }
2553 
2554  switch (type) {
2555  case kBOOLEAN:
2556  shard_output_buffers[col_idx]->addBoolean(
2557  int_value_at(*input_buffer, row_index));
2558  break;
2559  case kTINYINT:
2560  shard_output_buffers[col_idx]->addTinyint(
2561  int_value_at(*input_buffer, row_index));
2562  break;
2563  case kSMALLINT:
2564  shard_output_buffers[col_idx]->addSmallint(
2565  int_value_at(*input_buffer, row_index));
2566  break;
2567  case kINT:
2568  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2569  break;
2570  case kBIGINT:
2571  shard_output_buffers[col_idx]->addBigint(
2572  int_value_at(*input_buffer, row_index));
2573  break;
2574  case kFLOAT:
2575  shard_output_buffers[col_idx]->addFloat(
2576  float_value_at(*input_buffer, row_index));
2577  break;
2578  case kDOUBLE:
2579  shard_output_buffers[col_idx]->addDouble(
2580  double_value_at(*input_buffer, row_index));
2581  break;
2582  case kTEXT:
2583  case kVARCHAR:
2584  case kCHAR: {
2585  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2586  shard_output_buffers[col_idx]->addString(
2587  (*input_buffer->getStringBuffer())[row_index]);
2588  break;
2589  }
2590  case kTIME:
2591  case kTIMESTAMP:
2592  case kDATE:
2593  shard_output_buffers[col_idx]->addBigint(
2594  int_value_at(*input_buffer, row_index));
2595  break;
2596  case kARRAY:
2597  if (IS_STRING(col_ti.get_subtype())) {
2598  CHECK(input_buffer->getStringArrayBuffer());
2599  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2600  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2601  shard_output_buffers[col_idx]->addStringArray(input_arr);
2602  } else {
2603  shard_output_buffers[col_idx]->addArray(
2604  (*input_buffer->getArrayBuffer())[row_index]);
2605  }
2606  break;
2607  case kPOINT:
2608  case kLINESTRING:
2609  case kPOLYGON:
2610  case kMULTIPOLYGON: {
2611  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2612  shard_output_buffers[col_idx]->addGeoString(
2613  (*input_buffer->getGeoStringBuffer())[row_index]);
2614  break;
2615  }
2616  default:
2617  CHECK(false);
2618  }
2619  }
2620  ++all_shard_row_counts[shard];
2621  // when replicating a column, row count of a shard == replicate count of the column on
2622  // the shard
2623  if (getReplicating()) {
2624  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2625  }
2626  }
2627 }
Definition: sqltypes.h:50
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2434
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3287
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:568
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:299
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
Definition: sqltypes.h:54
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2465
#define CHECK_LE(x, y)
Definition: Logger.h:208
const TableDescriptor * table_desc_
Definition: Importer.h:567
bool getReplicating() const
Definition: Importer.h:548
Definition: sqltypes.h:42
#define IS_STRING(T)
Definition: sqltypes.h:162
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2472
Definition: sqltypes.h:46
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 2760 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

2760  {
2761  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2762  if (table_desc_->nShards) {
2764  }
2765  for (auto table_desc : table_descs) {
2766  table_desc->fragmenter->dropColumns(columnIds);
2767  }
2768 }
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3287
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566
const TableDescriptor * table_desc_
Definition: Importer.h:567

+ Here is the call graph for this function:

const std::list<const ColumnDescriptor*>& Importer_NS::Loader::get_column_descs ( ) const
inline

Definition at line 524 of file Importer.h.

References column_descs_.

Referenced by Importer_NS::setup_column_loaders().

524  {
525  return column_descs_;
526  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:568

+ Here is the caller graph for this function:

std::vector< DataBlockPtr > Importer_NS::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2657 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK(), CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by loadToShard().

2658  {
2659  std::vector<DataBlockPtr> result(import_buffers.size());
2660  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2661  encoded_data_block_ptrs_futures;
2662  // make all async calls to string dictionary here and then continue execution
2663  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2664  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2665  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2666  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2667  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2668 
2669  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2670  buf_idx,
2671  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2672  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2673  return import_buffers[buf_idx]->getStringDictBuffer();
2674  })));
2675  }
2676  }
2677 
2678  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2679  DataBlockPtr p;
2680  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2681  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2682  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2683  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2684  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2685  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2686  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2687  p.stringsPtr = string_payload_ptr;
2688  } else {
2689  // This condition means we have column which is ENCODED string. We already made
2690  // Async request to gain the encoded integer values above so we should skip this
2691  // iteration and continue.
2692  continue;
2693  }
2694  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2695  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2696  p.stringsPtr = geo_payload_ptr;
2697  } else {
2698  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2699  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2700  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2701  import_buffers[buf_idx]->addDictEncodedStringArray(
2702  *import_buffers[buf_idx]->getStringArrayBuffer());
2703  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2704  } else {
2705  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2706  }
2707  }
2708  result[buf_idx] = p;
2709  }
2710 
2711  // wait for the async requests we made for string dictionary
2712  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2713  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2714  }
2715  return result;
2716 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:139
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:140
CHECK(cgen_state)
#define IS_STRING(T)
Definition: sqltypes.h:162
int8_t * numbersPtr
Definition: sqltypes.h:138

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& Importer_NS::Loader::getCatalog ( )
inline

Definition at line 522 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpoch(), and setTableEpoch().

522 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566

+ Here is the caller graph for this function:

bool Importer_NS::Loader::getReplicating ( ) const
inline

Definition at line 548 of file Importer.h.

References replicating_.

Referenced by distributeToShards(), and loadToShard().

548 { return replicating_; }

+ Here is the caller graph for this function:

StringDictionary* Importer_NS::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 528 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by Importer_NS::setup_column_loaders().

528  {
529  if ((cd->columnType.get_type() != kARRAY ||
530  !IS_STRING(cd->columnType.get_subtype())) &&
531  (!cd->columnType.is_string() ||
533  return nullptr;
534  }
535  return dict_map_.at(cd->columnId);
536  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:249
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:570
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:248
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:256
#define IS_STRING(T)
Definition: sqltypes.h:162
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:399

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* Importer_NS::Loader::getTableDesc ( ) const
inline

Definition at line 523 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), getTableEpoch(), and setTableEpoch().

523 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:567

+ Here is the caller graph for this function:

int32_t Importer_NS::Loader::getTableEpoch ( )
virtual

Definition at line 4066 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpoch().

4066  {
4067  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4068  getTableDesc()->tableId);
4069 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:522
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2485
const TableDescriptor * getTableDesc() const
Definition: Importer.h:523

+ Here is the call graph for this function:

void Importer_NS::Loader::init ( )
protected

Definition at line 2770 of file Importer.cpp.

References catalog_, CHECK(), column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

2770  {
2773  for (auto cd : column_descs_) {
2774  insert_data_.columnIds.push_back(cd->columnId);
2775  if (cd->columnType.get_compression() == kENCODING_DICT) {
2776  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2777  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2778  CHECK(dd);
2779  dict_map_[cd->columnId] = dd->stringDict.get();
2780  }
2781  }
2782  insert_data_.numRows = 0;
2783 }
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:569
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:570
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:568
const TableDescriptor * table_desc_
Definition: Importer.h:567
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2427 of file Importer.cpp.

References loadImpl().

2428  {
2429  return loadImpl(import_buffers, row_count, true);
2430 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2629

+ Here is the call graph for this function:

bool Importer_NS::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2629 of file Importer.cpp.

References catalog_, distributeToShards(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2632  {
2633  if (table_desc_->nShards) {
2634  std::vector<OneShardBuffers> all_shard_import_buffers;
2635  std::vector<size_t> all_shard_row_counts;
2636  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2637  distributeToShards(all_shard_import_buffers,
2638  all_shard_row_counts,
2639  import_buffers,
2640  row_count,
2641  shard_tables.size());
2642  bool success = true;
2643  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2644  if (!all_shard_row_counts[shard_idx]) {
2645  continue;
2646  }
2647  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2648  all_shard_row_counts[shard_idx],
2649  shard_tables[shard_idx],
2650  checkpoint);
2651  }
2652  return success;
2653  }
2654  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2655 }
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2481
virtual void checkpoint()
Definition: Importer.cpp:4059
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3287
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:566
const TableDescriptor * table_desc_
Definition: Importer.h:567
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2718

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2421 of file Importer.cpp.

References loadImpl().

2423  {
2424  return loadImpl(import_buffers, row_count, false);
2425 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2629

+ Here is the call graph for this function:

bool Importer_NS::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2718 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, get_data_block_pointers(), getReplicating(), insert_data_, Fragmenter_Namespace::AbstractFragmenter::insertData(), Fragmenter_Namespace::AbstractFragmenter::insertDataNoCheckpoint(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

Referenced by loadImpl().

2722  {
2723  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2724  // patch insert_data with new column
2725  if (this->getReplicating()) {
2726  for (const auto& import_buff : import_buffers) {
2727  insert_data_.replicate_count = import_buff->get_replicate_count();
2728  }
2729  }
2730 
2732  ins_data.numRows = row_count;
2733  bool success = true;
2734 
2735  ins_data.data = get_data_block_pointers(import_buffers);
2736 
2737  for (const auto& import_buffer : import_buffers) {
2738  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2739  }
2740 
2741  // release loader_lock so that in InsertOrderFragmenter::insertDat
2742  // we can have multiple threads sort/shuffle InsertData
2743  loader_lock.unlock();
2744 
2745  {
2746  try {
2747  if (checkpoint) {
2748  shard_table->fragmenter->insertData(ins_data);
2749  } else {
2750  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2751  }
2752  } catch (std::exception& e) {
2753  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2754  success = false;
2755  }
2756  }
2757  return success;
2758 }
virtual void checkpoint()
Definition: Importer.cpp:4059
std::mutex loader_mutex_
Definition: Importer.h:581
#define LOG(tag)
Definition: Logger.h:188
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:569
virtual void insertDataNoCheckpoint(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool getReplicating() const
Definition: Importer.h:548
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2657
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Fragmenter_Namespace::AbstractFragmenter * fragmenter
virtual void insertData(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions with locks and che...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 547 of file Importer.h.

References replicating_.

547 { replicating_ = replicating; }
void Importer_NS::Loader::setTableEpoch ( const int32_t  new_epoch)
virtual

Definition at line 4071 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::setTableEpoch().

4071  {
4073  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4074 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:522
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2515
const TableDescriptor * getTableDesc() const
Definition: Importer.h:523

+ Here is the call graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog& Importer_NS::Loader::catalog_
protected

Definition at line 566 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> Importer_NS::Loader::column_descs_
protected

Definition at line 568 of file Importer.h.

Referenced by distributeToShards(), get_column_descs(), and init().

std::map<int, StringDictionary*> Importer_NS::Loader::dict_map_
protected

Definition at line 570 of file Importer.h.

Referenced by getStringDict(), and init().

Fragmenter_Namespace::InsertData Importer_NS::Loader::insert_data_
protected

Definition at line 569 of file Importer.h.

Referenced by init(), and loadToShard().

std::mutex Importer_NS::Loader::loader_mutex_
private

Definition at line 581 of file Importer.h.

Referenced by loadToShard().

bool Importer_NS::Loader::replicating_ = false
private

Definition at line 580 of file Importer.h.

Referenced by getReplicating(), and setReplicating().

const TableDescriptor* Importer_NS::Loader::table_desc_
protected

Definition at line 567 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getTableDesc(), init(), and loadImpl().


The documentation for this class was generated from the following files: