OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer_NS::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Loader:
+ Collaboration diagram for Importer_NS::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog ()
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual int32_t getTableEpoch ()
 
virtual void setTableEpoch (const int32_t new_epoch)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 537 of file Importer.h.

Member Typedef Documentation

using Importer_NS::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 584 of file Importer.h.

Constructor & Destructor Documentation

Importer_NS::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t 
)
inline

Definition at line 539 of file Importer.h.

References init().

540  : catalog_(c)
541  , table_desc_(t)
542  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true)) {
543  init();
544  }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:593
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1581
const TableDescriptor * table_desc_
Definition: Importer.h:592

+ Here is the call graph for this function:

virtual Importer_NS::Loader::~Loader ( )
inlinevirtual

Definition at line 546 of file Importer.h.

546 {}

Member Function Documentation

void Importer_NS::Loader::checkpoint ( )
virtual

Definition at line 4000 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpoint(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4000  {
4001  if (getTableDesc()->persistenceLevel ==
4002  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4003  getCatalog().checkpoint(getTableDesc()->tableId);
4004  }
4005 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:2929
const TableDescriptor * getTableDesc() const
Definition: Importer.h:549

+ Here is the call graph for this function:

void Importer_NS::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2433 of file Importer.cpp.

References catalog_, CHECK(), CHECK_GT, CHECK_LE, CHECK_LT, column_descs_, decimal_to_int_type(), Importer_NS::anonymous_namespace{Importer.cpp}::double_value_at(), Importer_NS::anonymous_namespace{Importer.cpp}::float_value_at(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), getReplicating(), Importer_NS::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, TableDescriptor::nShards, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, table_desc_, and run_benchmark_import::type.

Referenced by loadImpl().

2437  {
2438  all_shard_row_counts.resize(shard_count);
2439  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2440  all_shard_import_buffers.emplace_back();
2441  for (const auto& typed_import_buffer : import_buffers) {
2442  all_shard_import_buffers.back().emplace_back(
2443  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2444  typed_import_buffer->getStringDictionary()));
2445  }
2446  }
2448  int col_idx{0};
2449  const ColumnDescriptor* shard_col_desc{nullptr};
2450  for (const auto col_desc : column_descs_) {
2451  ++col_idx;
2452  if (col_idx == table_desc_->shardedColumnId) {
2453  shard_col_desc = col_desc;
2454  break;
2455  }
2456  }
2457  CHECK(shard_col_desc);
2458  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2459  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2460  const auto& shard_col_ti = shard_col_desc->columnType;
2461  CHECK(shard_col_ti.is_integer() ||
2462  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2463  shard_col_ti.is_time());
2464  if (shard_col_ti.is_string()) {
2465  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2466  CHECK(payloads_ptr);
2467  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2468  }
2469 
2470  // for each replicated (alter added) columns, number of rows in a shard is
2471  // inferred from that of the sharding column, not simply evenly distributed.
2472  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2473  // Here the loop count is overloaded. For normal imports, we loop thru all
2474  // input values (rows), so the loop count is the number of input rows.
2475  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2476  // all shards, so the loop count is the number of shards.
2477  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2478  for (size_t i = 0; i < loop_count; ++i) {
2479  const size_t shard =
2480  getReplicating()
2481  ? i
2482  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2483  auto& shard_output_buffers = all_shard_import_buffers[shard];
2484 
2485  // when replicate a column, populate 'rows' to all shards only once
2486  // and its value is fetch from the first and the single row
2487  const auto row_index = getReplicating() ? 0 : i;
2488 
2489  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2490  const auto& input_buffer = import_buffers[col_idx];
2491  const auto& col_ti = input_buffer->getTypeInfo();
2492  const auto type =
2493  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2494 
2495  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2496  // count. and, bypass non-replicated column.
2497  if (getReplicating()) {
2498  if (input_buffer->get_replicate_count() > 0) {
2499  shard_output_buffers[col_idx]->set_replicate_count(
2500  shard_tds[shard]->fragmenter->getNumRows());
2501  } else {
2502  continue;
2503  }
2504  }
2505 
2506  switch (type) {
2507  case kBOOLEAN:
2508  shard_output_buffers[col_idx]->addBoolean(
2509  int_value_at(*input_buffer, row_index));
2510  break;
2511  case kTINYINT:
2512  shard_output_buffers[col_idx]->addTinyint(
2513  int_value_at(*input_buffer, row_index));
2514  break;
2515  case kSMALLINT:
2516  shard_output_buffers[col_idx]->addSmallint(
2517  int_value_at(*input_buffer, row_index));
2518  break;
2519  case kINT:
2520  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2521  break;
2522  case kBIGINT:
2523  shard_output_buffers[col_idx]->addBigint(
2524  int_value_at(*input_buffer, row_index));
2525  break;
2526  case kFLOAT:
2527  shard_output_buffers[col_idx]->addFloat(
2528  float_value_at(*input_buffer, row_index));
2529  break;
2530  case kDOUBLE:
2531  shard_output_buffers[col_idx]->addDouble(
2532  double_value_at(*input_buffer, row_index));
2533  break;
2534  case kTEXT:
2535  case kVARCHAR:
2536  case kCHAR: {
2537  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2538  shard_output_buffers[col_idx]->addString(
2539  (*input_buffer->getStringBuffer())[row_index]);
2540  break;
2541  }
2542  case kTIME:
2543  case kTIMESTAMP:
2544  case kDATE:
2545  shard_output_buffers[col_idx]->addBigint(
2546  int_value_at(*input_buffer, row_index));
2547  break;
2548  case kARRAY:
2549  if (IS_STRING(col_ti.get_subtype())) {
2550  CHECK(input_buffer->getStringArrayBuffer());
2551  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2552  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2553  shard_output_buffers[col_idx]->addStringArray(input_arr);
2554  } else {
2555  shard_output_buffers[col_idx]->addArray(
2556  (*input_buffer->getArrayBuffer())[row_index]);
2557  }
2558  break;
2559  case kPOINT:
2560  case kLINESTRING:
2561  case kPOLYGON:
2562  case kMULTIPOLYGON: {
2563  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2564  shard_output_buffers[col_idx]->addGeoString(
2565  (*input_buffer->getGeoStringBuffer())[row_index]);
2566  break;
2567  }
2568  default:
2569  CHECK(false);
2570  }
2571  }
2572  ++all_shard_row_counts[shard];
2573  // when replicating a column, row count of a shard == replicate count of the column on
2574  // the shard
2575  if (getReplicating()) {
2576  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2577  }
2578  }
2579 }
Definition: sqltypes.h:52
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2386
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
#define CHECK_GT(x, y)
Definition: Logger.h:202
CHECK(cgen_state)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:593
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:268
#define CHECK_LT(x, y)
Definition: Logger.h:200
Definition: sqltypes.h:55
Definition: sqltypes.h:56
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2417
#define CHECK_LE(x, y)
Definition: Logger.h:201
const TableDescriptor * table_desc_
Definition: Importer.h:592
bool getReplicating() const
Definition: Importer.h:574
Definition: sqltypes.h:44
#define IS_STRING(T)
Definition: sqltypes.h:166
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2424
Definition: sqltypes.h:48
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& Importer_NS::Loader::get_column_descs ( ) const
inline

Definition at line 550 of file Importer.h.

References column_descs_.

550  {
551  return column_descs_;
552  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:593
std::vector< DataBlockPtr > Importer_NS::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2609 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK(), CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by loadToShard().

2610  {
2611  std::vector<DataBlockPtr> result(import_buffers.size());
2612  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2613  encoded_data_block_ptrs_futures;
2614  // make all async calls to string dictionary here and then continue execution
2615  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2616  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2617  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2618  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2619  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2620 
2621  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2622  buf_idx,
2623  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2624  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2625  return import_buffers[buf_idx]->getStringDictBuffer();
2626  })));
2627  }
2628  }
2629 
2630  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2631  DataBlockPtr p;
2632  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2633  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2634  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2635  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2636  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2637  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2638  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2639  p.stringsPtr = string_payload_ptr;
2640  } else {
2641  // This condition means we have column which is ENCODED string. We already made
2642  // Async request to gain the encoded integer values above so we should skip this
2643  // iteration and continue.
2644  continue;
2645  }
2646  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2647  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2648  p.stringsPtr = geo_payload_ptr;
2649  } else {
2650  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2651  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2652  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2653  import_buffers[buf_idx]->addDictEncodedStringArray(
2654  *import_buffers[buf_idx]->getStringArrayBuffer());
2655  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2656  } else {
2657  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2658  }
2659  }
2660  result[buf_idx] = p;
2661  }
2662 
2663  // wait for the async requests we made for string dictionary
2664  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2665  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2666  }
2667  return result;
2668 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
CHECK(cgen_state)
#define IS_STRING(T)
Definition: sqltypes.h:166
int8_t * numbersPtr
Definition: sqltypes.h:140

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& Importer_NS::Loader::getCatalog ( )
inline

Definition at line 548 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpoch(), and setTableEpoch().

548 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591

+ Here is the caller graph for this function:

bool Importer_NS::Loader::getReplicating ( ) const
inline

Definition at line 574 of file Importer.h.

References replicating_.

Referenced by distributeToShards(), and loadToShard().

574 { return replicating_; }

+ Here is the caller graph for this function:

StringDictionary* Importer_NS::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 554 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), IS_STRING, SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), kARRAY, and kENCODING_DICT.

554  {
555  if ((cd->columnType.get_type() != kARRAY ||
556  !IS_STRING(cd->columnType.get_subtype())) &&
557  (!cd->columnType.is_string() ||
559  return nullptr;
560  }
561  return dict_map_.at(cd->columnId);
562  }
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:595
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
#define IS_STRING(T)
Definition: sqltypes.h:166
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:327
bool is_string() const
Definition: sqltypes.h:477
SQLTypeInfo columnType

+ Here is the call graph for this function:

const TableDescriptor* Importer_NS::Loader::getTableDesc ( ) const
inline

Definition at line 549 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), getTableEpoch(), and setTableEpoch().

549 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:592

+ Here is the caller graph for this function:

int32_t Importer_NS::Loader::getTableEpoch ( )
virtual

Definition at line 4007 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpoch().

4007  {
4008  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4009  getTableDesc()->tableId);
4010 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2125
const TableDescriptor * getTableDesc() const
Definition: Importer.h:549

+ Here is the call graph for this function:

void Importer_NS::Loader::init ( )
protected

Definition at line 2714 of file Importer.cpp.

References catalog_, CHECK(), column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

2714  {
2717  for (auto cd : column_descs_) {
2718  insert_data_.columnIds.push_back(cd->columnId);
2719  if (cd->columnType.get_compression() == kENCODING_DICT) {
2720  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2721  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2722  CHECK(dd);
2723  dict_map_[cd->columnId] = dd->stringDict.get();
2724  }
2725  }
2726  insert_data_.numRows = 0;
2727 }
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:594
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:595
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1350
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:593
const TableDescriptor * table_desc_
Definition: Importer.h:592
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2379 of file Importer.cpp.

References loadImpl().

2380  {
2381  return loadImpl(import_buffers, row_count, true);
2382 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2581

+ Here is the call graph for this function:

bool Importer_NS::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2581 of file Importer.cpp.

References catalog_, distributeToShards(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2584  {
2585  if (table_desc_->nShards) {
2586  std::vector<OneShardBuffers> all_shard_import_buffers;
2587  std::vector<size_t> all_shard_row_counts;
2588  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2589  distributeToShards(all_shard_import_buffers,
2590  all_shard_row_counts,
2591  import_buffers,
2592  row_count,
2593  shard_tables.size());
2594  bool success = true;
2595  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2596  if (!all_shard_row_counts[shard_idx]) {
2597  continue;
2598  }
2599  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2600  all_shard_row_counts[shard_idx],
2601  shard_tables[shard_idx],
2602  checkpoint);
2603  }
2604  return success;
2605  }
2606  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2607 }
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2433
virtual void checkpoint()
Definition: Importer.cpp:4000
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
const TableDescriptor * table_desc_
Definition: Importer.h:592
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2670

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Importer_NS::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2373 of file Importer.cpp.

References loadImpl().

2375  {
2376  return loadImpl(import_buffers, row_count, false);
2377 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2581

+ Here is the call graph for this function:

bool Importer_NS::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2670 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, get_data_block_pointers(), getReplicating(), insert_data_, Fragmenter_Namespace::AbstractFragmenter::insertData(), Fragmenter_Namespace::AbstractFragmenter::insertDataNoCheckpoint(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

Referenced by loadImpl().

2674  {
2675  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2676  // patch insert_data with new column
2677  if (this->getReplicating()) {
2678  for (const auto& import_buff : import_buffers) {
2679  insert_data_.replicate_count = import_buff->get_replicate_count();
2680  insert_data_.columnDescriptors[import_buff->getColumnDesc()->columnId] =
2681  import_buff->getColumnDesc();
2682  }
2683  }
2684 
2686  ins_data.numRows = row_count;
2687  bool success = true;
2688 
2689  ins_data.data = get_data_block_pointers(import_buffers);
2690 
2691  for (const auto& import_buffer : import_buffers) {
2692  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2693  }
2694 
2695  // release loader_lock so that in InsertOrderFragmenter::insertDat
2696  // we can have multiple threads sort/shuffle InsertData
2697  loader_lock.unlock();
2698 
2699  {
2700  try {
2701  if (checkpoint) {
2702  shard_table->fragmenter->insertData(ins_data);
2703  } else {
2704  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2705  }
2706  } catch (std::exception& e) {
2707  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2708  success = false;
2709  }
2710  }
2711  return success;
2712 }
virtual void checkpoint()
Definition: Importer.cpp:4000
std::mutex loader_mutex_
Definition: Importer.h:606
#define LOG(tag)
Definition: Logger.h:185
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:594
std::map< int, const ColumnDescriptor * > columnDescriptors
Definition: Fragmenter.h:69
virtual void insertDataNoCheckpoint(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool getReplicating() const
Definition: Importer.h:574
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2609
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Fragmenter_Namespace::AbstractFragmenter * fragmenter
virtual void insertData(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions with locks and che...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Importer_NS::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 573 of file Importer.h.

References replicating_.

573 { replicating_ = replicating; }
void Importer_NS::Loader::setTableEpoch ( const int32_t  new_epoch)
virtual

Definition at line 4012 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::setTableEpoch().

4012  {
4014  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4015 }
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2155
const TableDescriptor * getTableDesc() const
Definition: Importer.h:549

+ Here is the call graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog& Importer_NS::Loader::catalog_
protected

Definition at line 591 of file Importer.h.

Referenced by distributeToShards(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> Importer_NS::Loader::column_descs_
protected

Definition at line 593 of file Importer.h.

Referenced by distributeToShards(), get_column_descs(), and init().

std::map<int, StringDictionary*> Importer_NS::Loader::dict_map_
protected

Definition at line 595 of file Importer.h.

Referenced by getStringDict(), and init().

Fragmenter_Namespace::InsertData Importer_NS::Loader::insert_data_
protected

Definition at line 594 of file Importer.h.

Referenced by init(), and loadToShard().

std::mutex Importer_NS::Loader::loader_mutex_
private

Definition at line 606 of file Importer.h.

Referenced by loadToShard().

bool Importer_NS::Loader::replicating_ = false
private

Definition at line 605 of file Importer.h.

Referenced by getReplicating(), and setReplicating().

const TableDescriptor* Importer_NS::Loader::table_desc_
protected

Definition at line 592 of file Importer.h.

Referenced by distributeToShards(), getTableDesc(), init(), and loadImpl().


The documentation for this class was generated from the following files: