OmniSciDB  dfae7c3b14
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const ColumnDescriptor * > & get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual std::vector< Catalog_Namespace::TableEpochInfogetTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 
void dropColumns (const std::vector< int > &columns)
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer > >
 

Protected Member Functions

void init (const bool use_catalog_locks)
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const ColumnDescriptor * > column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer > > &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 514 of file Importer.h.

Member Typedef Documentation

◆ LoadCallbackType

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer> >&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 518 of file Importer.h.

◆ OneShardBuffers

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer> >
protected

Definition at line 577 of file Importer.h.

Constructor & Destructor Documentation

◆ Loader()

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr,
bool  use_catalog_locks = true 
)
inline

Definition at line 523 of file Importer.h.

References logger::init().

527  : catalog_(c)
528  , table_desc_(t)
529  , column_descs_(
530  use_catalog_locks
531  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
532  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
533  , load_callback_(load_callback) {
534  init(use_catalog_locks);
535  }
const TableDescriptor * table_desc_
Definition: Importer.h:585
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2841
std::list< const ColumnDescriptor * > getAllColumnMetadataForTableUnlocked(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Definition: Catalog.cpp:1725
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1715
LoadCallbackType load_callback_
Definition: Importer.h:587
+ Here is the call graph for this function:

◆ ~Loader()

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 537 of file Importer.h.

537 {}

Member Function Documentation

◆ checkpoint()

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4160 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, and import_export::Importer::getCatalog().

4160  {
4161  if (getTableDesc()->persistenceLevel ==
4162  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4164  }
4165 }
void checkpointWithAutoRollback(const int logical_table_id)
Definition: Catalog.cpp:3731
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539
+ Here is the call graph for this function:

◆ distributeToShards()

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2548 of file Importer.cpp.

References CHECK, CHECK_GT, CHECK_LE, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, SHARD_FOR_KEY, and run_benchmark_import::type.

2552  {
2553  all_shard_row_counts.resize(shard_count);
2554  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2555  all_shard_import_buffers.emplace_back();
2556  for (const auto& typed_import_buffer : import_buffers) {
2557  all_shard_import_buffers.back().emplace_back(
2558  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2559  typed_import_buffer->getStringDictionary()));
2560  }
2561  }
2563  int col_idx{0};
2564  const ColumnDescriptor* shard_col_desc{nullptr};
2565  for (const auto col_desc : column_descs_) {
2566  ++col_idx;
2567  if (col_idx == table_desc_->shardedColumnId) {
2568  shard_col_desc = col_desc;
2569  break;
2570  }
2571  }
2572  CHECK(shard_col_desc);
2573  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2574  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2575  const auto& shard_col_ti = shard_col_desc->columnType;
2576  CHECK(shard_col_ti.is_integer() ||
2577  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2578  shard_col_ti.is_time());
2579  if (shard_col_ti.is_string()) {
2580  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2581  CHECK(payloads_ptr);
2582  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2583  }
2584 
2585  // for each replicated (alter added) columns, number of rows in a shard is
2586  // inferred from that of the sharding column, not simply evenly distributed.
2587  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2588  // Here the loop count is overloaded. For normal imports, we loop thru all
2589  // input values (rows), so the loop count is the number of input rows.
2590  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2591  // all shards, so the loop count is the number of shards.
2592  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2593  for (size_t i = 0; i < loop_count; ++i) {
2594  const size_t shard =
2595  getReplicating()
2596  ? i
2597  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2598  auto& shard_output_buffers = all_shard_import_buffers[shard];
2599 
2600  // when replicate a column, populate 'rows' to all shards only once
2601  // and its value is fetch from the first and the single row
2602  const auto row_index = getReplicating() ? 0 : i;
2603 
2604  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2605  const auto& input_buffer = import_buffers[col_idx];
2606  const auto& col_ti = input_buffer->getTypeInfo();
2607  const auto type =
2608  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2609 
2610  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2611  // count. and, bypass non-replicated column.
2612  if (getReplicating()) {
2613  if (input_buffer->get_replicate_count() > 0) {
2614  shard_output_buffers[col_idx]->set_replicate_count(
2615  shard_tds[shard]->fragmenter->getNumRows());
2616  } else {
2617  continue;
2618  }
2619  }
2620 
2621  switch (type) {
2622  case kBOOLEAN:
2623  shard_output_buffers[col_idx]->addBoolean(
2624  int_value_at(*input_buffer, row_index));
2625  break;
2626  case kTINYINT:
2627  shard_output_buffers[col_idx]->addTinyint(
2628  int_value_at(*input_buffer, row_index));
2629  break;
2630  case kSMALLINT:
2631  shard_output_buffers[col_idx]->addSmallint(
2632  int_value_at(*input_buffer, row_index));
2633  break;
2634  case kINT:
2635  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2636  break;
2637  case kBIGINT:
2638  shard_output_buffers[col_idx]->addBigint(
2639  int_value_at(*input_buffer, row_index));
2640  break;
2641  case kFLOAT:
2642  shard_output_buffers[col_idx]->addFloat(
2643  float_value_at(*input_buffer, row_index));
2644  break;
2645  case kDOUBLE:
2646  shard_output_buffers[col_idx]->addDouble(
2647  double_value_at(*input_buffer, row_index));
2648  break;
2649  case kTEXT:
2650  case kVARCHAR:
2651  case kCHAR: {
2652  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2653  shard_output_buffers[col_idx]->addString(
2654  (*input_buffer->getStringBuffer())[row_index]);
2655  break;
2656  }
2657  case kTIME:
2658  case kTIMESTAMP:
2659  case kDATE:
2660  shard_output_buffers[col_idx]->addBigint(
2661  int_value_at(*input_buffer, row_index));
2662  break;
2663  case kARRAY:
2664  if (IS_STRING(col_ti.get_subtype())) {
2665  CHECK(input_buffer->getStringArrayBuffer());
2666  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2667  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2668  shard_output_buffers[col_idx]->addStringArray(input_arr);
2669  } else {
2670  shard_output_buffers[col_idx]->addArray(
2671  (*input_buffer->getArrayBuffer())[row_index]);
2672  }
2673  break;
2674  case kPOINT:
2675  case kLINESTRING:
2676  case kPOLYGON:
2677  case kMULTIPOLYGON: {
2678  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2679  shard_output_buffers[col_idx]->addGeoString(
2680  (*input_buffer->getGeoStringBuffer())[row_index]);
2681  break;
2682  }
2683  default:
2684  CHECK(false);
2685  }
2686  }
2687  ++all_shard_row_counts[shard];
2688  // when replicating a column, row count of a shard == replicate count of the column on
2689  // the shard
2690  if (getReplicating()) {
2691  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2692  }
2693  }
2694 }
Definition: sqltypes.h:51
const TableDescriptor * table_desc_
Definition: Importer.h:585
bool getReplicating() const
Definition: Importer.h:566
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3619
#define CHECK_GT(x, y)
Definition: Logger.h:209
specifies the content in-memory of a row in the column metadata table
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2539
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:302
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:54
Definition: sqltypes.h:55
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2532
Definition: sqltypes.h:43
#define IS_STRING(T)
Definition: sqltypes.h:173
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2501
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:197
Definition: sqltypes.h:47
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
+ Here is the call graph for this function:

◆ dropColumns()

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 2831 of file Importer.cpp.

2831  {
2832  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2833  if (table_desc_->nShards) {
2835  }
2836  for (auto table_desc : table_descs) {
2837  table_desc->fragmenter->dropColumns(columnIds);
2838  }
2839 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3619
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

◆ get_column_descs()

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 541 of file Importer.h.

Referenced by import_export::setup_column_loaders().

541  {
542  return column_descs_;
543  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
+ Here is the caller graph for this function:

◆ get_data_block_pointers()

std::vector< DataBlockPtr > import_export::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2728 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

2729  {
2730  std::vector<DataBlockPtr> result(import_buffers.size());
2731  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2732  encoded_data_block_ptrs_futures;
2733  // make all async calls to string dictionary here and then continue execution
2734  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2735  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2736  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2737  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2738  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2739 
2740  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2741  buf_idx,
2742  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2743  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2744  return import_buffers[buf_idx]->getStringDictBuffer();
2745  })));
2746  }
2747  }
2748 
2749  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2750  DataBlockPtr p;
2751  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2752  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2753  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2754  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2755  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2756  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2757  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2758  p.stringsPtr = string_payload_ptr;
2759  } else {
2760  // This condition means we have column which is ENCODED string. We already made
2761  // Async request to gain the encoded integer values above so we should skip this
2762  // iteration and continue.
2763  continue;
2764  }
2765  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2766  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2767  p.stringsPtr = geo_payload_ptr;
2768  } else {
2769  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2770  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2771  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2772  import_buffers[buf_idx]->addDictEncodedStringArray(
2773  *import_buffers[buf_idx]->getStringArrayBuffer());
2774  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2775  } else {
2776  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2777  }
2778  }
2779  result[buf_idx] = p;
2780  }
2781 
2782  // wait for the async requests we made for string dictionary
2783  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2784  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2785  }
2786  return result;
2787 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
#define IS_STRING(T)
Definition: sqltypes.h:173
#define CHECK(condition)
Definition: Logger.h:197
int8_t * numbersPtr
Definition: sqltypes.h:149

◆ getCatalog()

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 539 of file Importer.h.

539 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

◆ getReplicating()

bool import_export::Loader::getReplicating ( ) const
inline

Definition at line 566 of file Importer.h.

References logger::init().

566 { return replicating_; }
+ Here is the call graph for this function:

◆ getStringDict()

StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 545 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

545  {
546  if ((cd->columnType.get_type() != kARRAY ||
547  !IS_STRING(cd->columnType.get_subtype())) &&
548  (!cd->columnType.is_string() ||
550  return nullptr;
551  }
552  return dict_map_.at(cd->columnId);
553  }
bool is_string() const
Definition: sqltypes.h:417
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:260
#define IS_STRING(T)
Definition: sqltypes.h:173
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
SQLTypeInfo columnType
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getTableDesc()

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 540 of file Importer.h.

540 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:585

◆ getTableEpochs()

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4167 of file Importer.cpp.

References import_export::Importer::getCatalog(), and Catalog_Namespace::Catalog::getTableEpochs().

4167  {
4168  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4169  getTableDesc()->tableId);
4170 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2783
+ Here is the call graph for this function:

◆ init()

void import_export::Loader::init ( const bool  use_catalog_locks)
protected

Definition at line 2841 of file Importer.cpp.

References CHECK, and kENCODING_DICT.

2841  {
2844  for (auto cd : column_descs_) {
2845  insert_data_.columnIds.push_back(cd->columnId);
2846  if (cd->columnType.get_compression() == kENCODING_DICT) {
2847  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2848  const auto dd = use_catalog_locks
2849  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2851  cd->columnType.get_comp_param(), true);
2852  CHECK(dd);
2853  dict_map_[cd->columnId] = dd->stringDict.get();
2854  }
2855  }
2856  insert_data_.numRows = 0;
2857 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589
const DictDescriptor * getMetadataForDictUnlocked(int dict_ref, bool loadDict) const
Definition: Catalog.cpp:1457

◆ load()

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2494 of file Importer.cpp.

2495  {
2496  return loadImpl(import_buffers, row_count, true);
2497 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2696

◆ loadImpl()

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2696 of file Importer.cpp.

2699  {
2700  if (load_callback_) {
2701  auto data_blocks = get_data_block_pointers(import_buffers);
2702  return load_callback_(import_buffers, data_blocks, row_count);
2703  }
2704  if (table_desc_->nShards) {
2705  std::vector<OneShardBuffers> all_shard_import_buffers;
2706  std::vector<size_t> all_shard_row_counts;
2707  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2708  distributeToShards(all_shard_import_buffers,
2709  all_shard_row_counts,
2710  import_buffers,
2711  row_count,
2712  shard_tables.size());
2713  bool success = true;
2714  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2715  if (!all_shard_row_counts[shard_idx]) {
2716  continue;
2717  }
2718  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2719  all_shard_row_counts[shard_idx],
2720  shard_tables[shard_idx],
2721  checkpoint);
2722  }
2723  return success;
2724  }
2725  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2726 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
virtual void checkpoint()
Definition: Importer.cpp:4160
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3619
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2789
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2548
LoadCallbackType load_callback_
Definition: Importer.h:587
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2728

◆ loadNoCheckpoint()

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2488 of file Importer.cpp.

2490  {
2491  return loadImpl(import_buffers, row_count, false);
2492 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2696

◆ loadToShard()

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2789 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, LOG, and Fragmenter_Namespace::InsertData::numRows.

2793  {
2794  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2795  // patch insert_data with new column
2796  if (this->getReplicating()) {
2797  for (const auto& import_buff : import_buffers) {
2798  insert_data_.replicate_count = import_buff->get_replicate_count();
2799  }
2800  }
2801 
2803  ins_data.numRows = row_count;
2804  bool success = true;
2805 
2806  ins_data.data = get_data_block_pointers(import_buffers);
2807 
2808  for (const auto& import_buffer : import_buffers) {
2809  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2810  }
2811 
2812  // release loader_lock so that in InsertOrderFragmenter::insertDat
2813  // we can have multiple threads sort/shuffle InsertData
2814  loader_lock.unlock();
2815 
2816  {
2817  try {
2818  if (checkpoint) {
2819  shard_table->fragmenter->insertData(ins_data);
2820  } else {
2821  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2822  }
2823  } catch (std::exception& e) {
2824  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2825  success = false;
2826  }
2827  }
2828  return success;
2829 }
std::mutex loader_mutex_
Definition: Importer.h:600
#define LOG(tag)
Definition: Logger.h:188
virtual void checkpoint()
Definition: Importer.cpp:4160
bool getReplicating() const
Definition: Importer.h:566
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2728

◆ setReplicating()

void import_export::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 565 of file Importer.h.

565 { replicating_ = replicating; }

◆ setTableEpochs()

void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4172 of file Importer.cpp.

References import_export::Importer::getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4173  {
4174  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4175 }
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs)
Definition: Catalog.cpp:2811
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:539
+ Here is the call graph for this function:

Member Data Documentation

◆ catalog_

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 584 of file Importer.h.

◆ column_descs_

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 586 of file Importer.h.

◆ dict_map_

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 589 of file Importer.h.

◆ insert_data_

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 588 of file Importer.h.

◆ load_callback_

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 587 of file Importer.h.

◆ loader_mutex_

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 600 of file Importer.h.

◆ replicating_

bool import_export::Loader::replicating_ = false
private

Definition at line 599 of file Importer.h.

◆ table_desc_

const TableDescriptor* import_export::Loader::table_desc_
protected

Definition at line 585 of file Importer.h.


The documentation for this class was generated from the following files: