OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 
void dropColumns (const std::vector< int > &columns)
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init (const bool use_catalog_locks)
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 514 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 518 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 577 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr,
bool  use_catalog_locks = true 
)
inline

Definition at line 523 of file Importer.h.

References init().

527  : catalog_(c)
528  , table_desc_(t)
529  , column_descs_(
530  use_catalog_locks
531  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
532  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
533  , load_callback_(load_callback) {
534  init(use_catalog_locks);
535  }
std::list< const ColumnDescriptor * > getAllColumnMetadataForTableUnlocked(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Definition: Catalog.cpp:1726
const TableDescriptor * table_desc_
Definition: Importer.h:585
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2870
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1716
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
LoadCallbackType load_callback_
Definition: Importer.h:587

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 537 of file Importer.h.

537 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4198 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4198  {
4199  if (getTableDesc()->persistenceLevel ==
4200  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4202  }
4203 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
void checkpointWithAutoRollback(const int logical_table_id)
Definition: Catalog.cpp:3931
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2577 of file Importer.cpp.

References catalog_, CHECK, CHECK_GT, CHECK_LE, CHECK_LT, column_descs_, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), getReplicating(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, TableDescriptor::nShards, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, table_desc_, and run_benchmark_import::type.

Referenced by loadImpl().

2581  {
2582  all_shard_row_counts.resize(shard_count);
2583  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2584  all_shard_import_buffers.emplace_back();
2585  for (const auto& typed_import_buffer : import_buffers) {
2586  all_shard_import_buffers.back().emplace_back(
2587  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2588  typed_import_buffer->getStringDictionary()));
2589  }
2590  }
2592  int col_idx{0};
2593  const ColumnDescriptor* shard_col_desc{nullptr};
2594  for (const auto col_desc : column_descs_) {
2595  ++col_idx;
2596  if (col_idx == table_desc_->shardedColumnId) {
2597  shard_col_desc = col_desc;
2598  break;
2599  }
2600  }
2601  CHECK(shard_col_desc);
2602  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2603  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2604  const auto& shard_col_ti = shard_col_desc->columnType;
2605  CHECK(shard_col_ti.is_integer() ||
2606  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2607  shard_col_ti.is_time());
2608  if (shard_col_ti.is_string()) {
2609  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2610  CHECK(payloads_ptr);
2611  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2612  }
2613 
2614  // for each replicated (alter added) columns, number of rows in a shard is
2615  // inferred from that of the sharding column, not simply evenly distributed.
2616  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2617  // Here the loop count is overloaded. For normal imports, we loop thru all
2618  // input values (rows), so the loop count is the number of input rows.
2619  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2620  // all shards, so the loop count is the number of shards.
2621  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2622  for (size_t i = 0; i < loop_count; ++i) {
2623  const size_t shard =
2624  getReplicating()
2625  ? i
2626  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2627  auto& shard_output_buffers = all_shard_import_buffers[shard];
2628 
2629  // when replicate a column, populate 'rows' to all shards only once
2630  // and its value is fetch from the first and the single row
2631  const auto row_index = getReplicating() ? 0 : i;
2632 
2633  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2634  const auto& input_buffer = import_buffers[col_idx];
2635  const auto& col_ti = input_buffer->getTypeInfo();
2636  const auto type =
2637  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2638 
2639  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2640  // count. and, bypass non-replicated column.
2641  if (getReplicating()) {
2642  if (input_buffer->get_replicate_count() > 0) {
2643  shard_output_buffers[col_idx]->set_replicate_count(
2644  shard_tds[shard]->fragmenter->getNumRows());
2645  } else {
2646  continue;
2647  }
2648  }
2649 
2650  switch (type) {
2651  case kBOOLEAN:
2652  shard_output_buffers[col_idx]->addBoolean(
2653  int_value_at(*input_buffer, row_index));
2654  break;
2655  case kTINYINT:
2656  shard_output_buffers[col_idx]->addTinyint(
2657  int_value_at(*input_buffer, row_index));
2658  break;
2659  case kSMALLINT:
2660  shard_output_buffers[col_idx]->addSmallint(
2661  int_value_at(*input_buffer, row_index));
2662  break;
2663  case kINT:
2664  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2665  break;
2666  case kBIGINT:
2667  shard_output_buffers[col_idx]->addBigint(
2668  int_value_at(*input_buffer, row_index));
2669  break;
2670  case kFLOAT:
2671  shard_output_buffers[col_idx]->addFloat(
2672  float_value_at(*input_buffer, row_index));
2673  break;
2674  case kDOUBLE:
2675  shard_output_buffers[col_idx]->addDouble(
2676  double_value_at(*input_buffer, row_index));
2677  break;
2678  case kTEXT:
2679  case kVARCHAR:
2680  case kCHAR: {
2681  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2682  shard_output_buffers[col_idx]->addString(
2683  (*input_buffer->getStringBuffer())[row_index]);
2684  break;
2685  }
2686  case kTIME:
2687  case kTIMESTAMP:
2688  case kDATE:
2689  shard_output_buffers[col_idx]->addBigint(
2690  int_value_at(*input_buffer, row_index));
2691  break;
2692  case kARRAY:
2693  if (IS_STRING(col_ti.get_subtype())) {
2694  CHECK(input_buffer->getStringArrayBuffer());
2695  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2696  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2697  shard_output_buffers[col_idx]->addStringArray(input_arr);
2698  } else {
2699  shard_output_buffers[col_idx]->addArray(
2700  (*input_buffer->getArrayBuffer())[row_index]);
2701  }
2702  break;
2703  case kPOINT:
2704  case kLINESTRING:
2705  case kPOLYGON:
2706  case kMULTIPOLYGON: {
2707  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2708  shard_output_buffers[col_idx]->addGeoString(
2709  (*input_buffer->getGeoStringBuffer())[row_index]);
2710  break;
2711  }
2712  default:
2713  CHECK(false);
2714  }
2715  }
2716  ++all_shard_row_counts[shard];
2717  // when replicating a column, row count of a shard == replicate count of the column on
2718  // the shard
2719  if (getReplicating()) {
2720  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2721  }
2722  }
2723 }
Definition: sqltypes.h:48
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3786
const TableDescriptor * table_desc_
Definition: Importer.h:585
#define CHECK_GT(x, y)
Definition: Logger.h:209
specifies the content in-memory of a row in the column metadata table
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2568
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:303
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:51
Definition: sqltypes.h:52
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2561
Definition: sqltypes.h:40
#define IS_STRING(T)
Definition: sqltypes.h:241
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2530
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:197
Definition: sqltypes.h:44
bool getReplicating() const
Definition: Importer.h:566
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 2860 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

2860  {
2861  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2862  if (table_desc_->nShards) {
2864  }
2865  for (auto table_desc : table_descs) {
2866  table_desc->fragmenter->dropColumns(columnIds);
2867  }
2868 }
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3786
const TableDescriptor * table_desc_
Definition: Importer.h:585
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

+ Here is the call graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 541 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

541  {
542  return column_descs_;
543  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586

+ Here is the caller graph for this function:

std::vector< DataBlockPtr > import_export::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2757 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by loadImpl(), and loadToShard().

2758  {
2759  std::vector<DataBlockPtr> result(import_buffers.size());
2760  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2761  encoded_data_block_ptrs_futures;
2762  // make all async calls to string dictionary here and then continue execution
2763  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2764  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2765  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2766  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2767  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2768 
2769  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2770  buf_idx,
2771  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2772  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2773  return import_buffers[buf_idx]->getStringDictBuffer();
2774  })));
2775  }
2776  }
2777 
2778  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2779  DataBlockPtr p;
2780  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2781  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2782  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2783  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2784  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2785  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2786  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2787  p.stringsPtr = string_payload_ptr;
2788  } else {
2789  // This condition means we have column which is ENCODED string. We already made
2790  // Async request to gain the encoded integer values above so we should skip this
2791  // iteration and continue.
2792  continue;
2793  }
2794  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2795  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2796  p.stringsPtr = geo_payload_ptr;
2797  } else {
2798  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2799  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2800  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2801  import_buffers[buf_idx]->addDictEncodedStringArray(
2802  *import_buffers[buf_idx]->getStringArrayBuffer());
2803  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2804  } else {
2805  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2806  }
2807  }
2808  result[buf_idx] = p;
2809  }
2810 
2811  // wait for the async requests we made for string dictionary
2812  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2813  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2814  }
2815  return result;
2816 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
#define IS_STRING(T)
Definition: sqltypes.h:241
#define CHECK(condition)
Definition: Logger.h:197
int8_t * numbersPtr
Definition: sqltypes.h:217

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 539 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

539 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

+ Here is the caller graph for this function:

bool import_export::Loader::getReplicating ( ) const
inline

Definition at line 566 of file Importer.h.

References replicating_.

Referenced by distributeToShards(), and loadToShard().

566 { return replicating_; }

+ Here is the caller graph for this function:

StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 545 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

545  {
546  if ((cd->columnType.get_type() != kARRAY ||
547  !IS_STRING(cd->columnType.get_subtype())) &&
548  (!cd->columnType.is_string() ||
550  return nullptr;
551  }
552  return dict_map_.at(cd->columnId);
553  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:312
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
#define IS_STRING(T)
Definition: sqltypes.h:241
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:478
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 540 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

540 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:585

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4205 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4205  {
4206  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4207  getTableDesc()->tableId);
4208 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2945

+ Here is the call graph for this function:

void import_export::Loader::init ( const bool  use_catalog_locks)
protected

Definition at line 2870 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForDictUnlocked(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

2870  {
2873  for (auto cd : column_descs_) {
2874  insert_data_.columnIds.push_back(cd->columnId);
2875  if (cd->columnType.get_compression() == kENCODING_DICT) {
2876  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2877  const auto dd = use_catalog_locks
2878  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2880  cd->columnType.get_comp_param(), true);
2881  CHECK(dd);
2882  dict_map_[cd->columnId] = dd->stringDict.get();
2883  }
2884  }
2885  insert_data_.numRows = 0;
2886 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1439
const DictDescriptor * getMetadataForDictUnlocked(int dict_ref, bool loadDict) const
Definition: Catalog.cpp:1453
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2523 of file Importer.cpp.

References loadImpl().

2524  {
2525  return loadImpl(import_buffers, row_count, true);
2526 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2725

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2725 of file Importer.cpp.

References catalog_, distributeToShards(), get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2728  {
2729  if (load_callback_) {
2730  auto data_blocks = get_data_block_pointers(import_buffers);
2731  return load_callback_(import_buffers, data_blocks, row_count);
2732  }
2733  if (table_desc_->nShards) {
2734  std::vector<OneShardBuffers> all_shard_import_buffers;
2735  std::vector<size_t> all_shard_row_counts;
2736  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2737  distributeToShards(all_shard_import_buffers,
2738  all_shard_row_counts,
2739  import_buffers,
2740  row_count,
2741  shard_tables.size());
2742  bool success = true;
2743  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2744  if (!all_shard_row_counts[shard_idx]) {
2745  continue;
2746  }
2747  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2748  all_shard_row_counts[shard_idx],
2749  shard_tables[shard_idx],
2750  checkpoint);
2751  }
2752  return success;
2753  }
2754  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2755 }
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3786
const TableDescriptor * table_desc_
Definition: Importer.h:585
virtual void checkpoint()
Definition: Importer.cpp:4198
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2818
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2577
LoadCallbackType load_callback_
Definition: Importer.h:587
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2757

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2517 of file Importer.cpp.

References loadImpl().

2519  {
2520  return loadImpl(import_buffers, row_count, false);
2521 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2725

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2818 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, get_data_block_pointers(), getReplicating(), insert_data_, loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

Referenced by loadImpl().

2822  {
2823  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2824  // patch insert_data with new column
2825  if (this->getReplicating()) {
2826  for (const auto& import_buff : import_buffers) {
2827  insert_data_.replicate_count = import_buff->get_replicate_count();
2828  }
2829  }
2830 
2832  ins_data.numRows = row_count;
2833  bool success = true;
2834 
2835  ins_data.data = get_data_block_pointers(import_buffers);
2836 
2837  for (const auto& import_buffer : import_buffers) {
2838  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2839  }
2840 
2841  // release loader_lock so that in InsertOrderFragmenter::insertDat
2842  // we can have multiple threads sort/shuffle InsertData
2843  loader_lock.unlock();
2844 
2845  {
2846  try {
2847  if (checkpoint) {
2848  shard_table->fragmenter->insertData(ins_data);
2849  } else {
2850  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2851  }
2852  } catch (std::exception& e) {
2853  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2854  success = false;
2855  }
2856  }
2857  return success;
2858 }
std::mutex loader_mutex_
Definition: Importer.h:600
#define LOG(tag)
Definition: Logger.h:188
virtual void checkpoint()
Definition: Importer.cpp:4198
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool getReplicating() const
Definition: Importer.h:566
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2757

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 565 of file Importer.h.

References replicating_.

565 { replicating_ = replicating; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4210 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4211  {
4212  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4213 }
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs)
Definition: Catalog.cpp:2973
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539

+ Here is the call graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 584 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 586 of file Importer.h.

Referenced by distributeToShards(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 589 of file Importer.h.

Referenced by getStringDict(), and init().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 588 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 587 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 600 of file Importer.h.

Referenced by loadToShard().

bool import_export::Loader::replicating_ = false
private

Definition at line 599 of file Importer.h.

Referenced by getReplicating(), and setReplicating().

const TableDescriptor* import_export::Loader::table_desc_
protected

Definition at line 585 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getTableDesc(), init(), and loadImpl().


The documentation for this class was generated from the following files: