OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setAddingColumns (const bool adding_columns)
 
bool isAddingColumns () const
 
void dropColumns (const std::vector< int > &columns)
 
std::string getErrorMessage ()
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init (const bool use_catalog_locks)
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsNewColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsExistingColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void fillShardRow (const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
 

Private Attributes

bool adding_columns_ = false
 
std::mutex loader_mutex_
 
std::string error_msg_
 

Detailed Description

Definition at line 509 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 513 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 576 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr,
bool  use_catalog_locks = true 
)
inline

Definition at line 518 of file Importer.h.

References init().

522  : catalog_(c)
523  , table_desc_(t)
524  , column_descs_(
525  use_catalog_locks
526  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
527  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
528  , load_callback_(load_callback) {
529  init(use_catalog_locks);
530  }
std::list< const ColumnDescriptor * > getAllColumnMetadataForTableUnlocked(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Definition: Catalog.cpp:1781
const TableDescriptor * table_desc_
Definition: Importer.h:585
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2957
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1771
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
LoadCallbackType load_callback_
Definition: Importer.h:587

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 532 of file Importer.h.

532 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4335 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4335  {
4336  if (getTableDesc()->persistenceLevel ==
4337  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4338  // tables
4340  }
4341 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:535
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:4210
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:534

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
protected

Definition at line 2764 of file Importer.cpp.

References CHECK_GT, distributeToShardsExistingColumns(), distributeToShardsNewColumns(), isAddingColumns(), TableDescriptor::shardedColumnId, and table_desc_.

Referenced by loadImpl().

2769  {
2770  all_shard_row_counts.resize(shard_count);
2771  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2772  all_shard_import_buffers.emplace_back();
2773  for (const auto& typed_import_buffer : import_buffers) {
2774  all_shard_import_buffers.back().emplace_back(
2775  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2776  typed_import_buffer->getStringDictionary()));
2777  }
2778  }
2780  if (isAddingColumns()) {
2781  distributeToShardsNewColumns(all_shard_import_buffers,
2782  all_shard_row_counts,
2783  import_buffers,
2784  row_count,
2785  shard_count,
2786  session_info);
2787  } else {
2788  distributeToShardsExistingColumns(all_shard_import_buffers,
2789  all_shard_row_counts,
2790  import_buffers,
2791  row_count,
2792  shard_count,
2793  session_info);
2794  }
2795 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
#define CHECK_GT(x, y)
Definition: Logger.h:215
bool isAddingColumns() const
Definition: Importer.h:563
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2705
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2743

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsExistingColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2705 of file Importer.cpp.

References CHECK, CHECK_LE, column_descs_, fillShardRow(), i, import_export::anonymous_namespace{Importer.cpp}::int_value_at(), kENCODING_DICT, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, and table_desc_.

Referenced by distributeToShards().

2711  {
2712  int col_idx{0};
2713  const ColumnDescriptor* shard_col_desc{nullptr};
2714  for (const auto col_desc : column_descs_) {
2715  ++col_idx;
2716  if (col_idx == table_desc_->shardedColumnId) {
2717  shard_col_desc = col_desc;
2718  break;
2719  }
2720  }
2721  CHECK(shard_col_desc);
2722  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2723  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2724  const auto& shard_col_ti = shard_col_desc->columnType;
2725  CHECK(shard_col_ti.is_integer() ||
2726  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2727  shard_col_ti.is_time());
2728  if (shard_col_ti.is_string()) {
2729  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2730  CHECK(payloads_ptr);
2731  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2732  }
2733 
2734  for (size_t i = 0; i < row_count; ++i) {
2735  const size_t shard =
2736  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2737  auto& shard_output_buffers = all_shard_import_buffers[shard];
2738  fillShardRow(i, shard_output_buffers, import_buffers);
2739  ++all_shard_row_counts[shard];
2740  }
2741 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2633
#define CHECK_LE(x, y)
Definition: Logger.h:214
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2586
#define CHECK(condition)
Definition: Logger.h:203
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsNewColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2743 of file Importer.cpp.

References catalog_, CHECK, fillShardRow(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), and table_desc_.

Referenced by distributeToShards().

2749  {
2750  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2751  CHECK(shard_tds.size() == shard_count);
2752 
2753  for (size_t shard = 0; shard < shard_count; ++shard) {
2754  auto& shard_output_buffers = all_shard_import_buffers[shard];
2755  if (row_count != 0) {
2756  fillShardRow(0, shard_output_buffers, import_buffers);
2757  }
2758  // when replicating a column, row count of a shard == replicate count of the column
2759  // on the shard
2760  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2761  }
2762 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2633
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4063
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 2947 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

2947  {
2948  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2949  if (table_desc_->nShards) {
2951  }
2952  for (auto table_desc : table_descs) {
2953  table_desc->fragmenter->dropColumns(columnIds);
2954  }
2955 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4063
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

+ Here is the call graph for this function:

void import_export::Loader::fillShardRow ( const size_t  row_index,
OneShardBuffers shard_output_buffers,
const OneShardBuffers import_buffers 
)
private

Definition at line 2633 of file Importer.cpp.

References CHECK, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and run_benchmark_import::type.

Referenced by distributeToShardsExistingColumns(), and distributeToShardsNewColumns().

2635  {
2636  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2637  const auto& input_buffer = import_buffers[col_idx];
2638  const auto& col_ti = input_buffer->getTypeInfo();
2639  const auto type =
2640  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2641 
2642  switch (type) {
2643  case kBOOLEAN:
2644  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2645  break;
2646  case kTINYINT:
2647  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2648  break;
2649  case kSMALLINT:
2650  shard_output_buffers[col_idx]->addSmallint(
2651  int_value_at(*input_buffer, row_index));
2652  break;
2653  case kINT:
2654  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2655  break;
2656  case kBIGINT:
2657  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2658  break;
2659  case kFLOAT:
2660  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2661  break;
2662  case kDOUBLE:
2663  shard_output_buffers[col_idx]->addDouble(
2664  double_value_at(*input_buffer, row_index));
2665  break;
2666  case kTEXT:
2667  case kVARCHAR:
2668  case kCHAR: {
2669  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2670  shard_output_buffers[col_idx]->addString(
2671  (*input_buffer->getStringBuffer())[row_index]);
2672  break;
2673  }
2674  case kTIME:
2675  case kTIMESTAMP:
2676  case kDATE:
2677  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2678  break;
2679  case kARRAY:
2680  if (IS_STRING(col_ti.get_subtype())) {
2681  CHECK(input_buffer->getStringArrayBuffer());
2682  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2683  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2684  shard_output_buffers[col_idx]->addStringArray(input_arr);
2685  } else {
2686  shard_output_buffers[col_idx]->addArray(
2687  (*input_buffer->getArrayBuffer())[row_index]);
2688  }
2689  break;
2690  case kPOINT:
2691  case kLINESTRING:
2692  case kPOLYGON:
2693  case kMULTIPOLYGON: {
2694  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2695  shard_output_buffers[col_idx]->addGeoString(
2696  (*input_buffer->getGeoStringBuffer())[row_index]);
2697  break;
2698  }
2699  default:
2700  CHECK(false);
2701  }
2702  }
2703 }
Definition: sqltypes.h:48
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2624
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:419
#define CHECK_LT(x, y)
Definition: Logger.h:213
Definition: sqltypes.h:51
Definition: sqltypes.h:52
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2617
Definition: sqltypes.h:40
#define IS_STRING(T)
Definition: sqltypes.h:244
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2586
#define CHECK(condition)
Definition: Logger.h:203
Definition: sqltypes.h:44

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 536 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

536  {
537  return column_descs_;
538  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 534 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

534 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584

+ Here is the caller graph for this function:

std::string import_export::Loader::getErrorMessage ( )
inline

Definition at line 565 of file Importer.h.

References error_msg_.

565 { return error_msg_; };
std::string error_msg_
Definition: Importer.h:617
StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 540 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

540  {
541  if ((cd->columnType.get_type() != kARRAY ||
542  !IS_STRING(cd->columnType.get_subtype())) &&
543  (!cd->columnType.is_string() ||
545  return nullptr;
546  }
547  return dict_map_.at(cd->columnId);
548  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:315
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
#define IS_STRING(T)
Definition: sqltypes.h:244
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:489
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 535 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

535 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:585

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4343 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4343  {
4344  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4345  getTableDesc()->tableId);
4346 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:535
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:534
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3041

+ Here is the call graph for this function:

void import_export::Loader::init ( const bool  use_catalog_locks)
protected

Definition at line 2957 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForDictUnlocked(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

2957  {
2960  for (auto cd : column_descs_) {
2961  insert_data_.columnIds.push_back(cd->columnId);
2962  if (cd->columnType.get_compression() == kENCODING_DICT) {
2963  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2964  const auto dd = use_catalog_locks
2965  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2967  cd->columnType.get_comp_param(), true);
2968  CHECK(dd);
2969  dict_map_[cd->columnId] = dd->stringDict.get();
2970  }
2971  }
2972  insert_data_.numRows = 0;
2973 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
const DictDescriptor * getMetadataForDictUnlocked(int dict_ref, bool loadDict) const
Definition: Catalog.cpp:1508
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:586
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
#define CHECK(condition)
Definition: Logger.h:203
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:589

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::isAddingColumns ( ) const
inline

Definition at line 563 of file Importer.h.

References adding_columns_.

Referenced by distributeToShards(), and loadToShard().

563 { return adding_columns_; }

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2578 of file Importer.cpp.

References loadImpl().

2580  {
2581  return loadImpl(import_buffers, row_count, true, session_info);
2582 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2797

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
protectedvirtual

Definition at line 2797 of file Importer.cpp.

References catalog_, distributeToShards(), import_export::TypedImportBuffer::get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2801  {
2802  if (load_callback_) {
2803  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2804  return load_callback_(import_buffers, data_blocks, row_count);
2805  }
2806  if (table_desc_->nShards) {
2807  std::vector<OneShardBuffers> all_shard_import_buffers;
2808  std::vector<size_t> all_shard_row_counts;
2809  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2810  distributeToShards(all_shard_import_buffers,
2811  all_shard_row_counts,
2812  import_buffers,
2813  row_count,
2814  shard_tables.size(),
2815  session_info);
2816  bool success = true;
2817  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2818  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2819  all_shard_row_counts[shard_idx],
2820  shard_tables[shard_idx],
2821  checkpoint,
2822  session_info);
2823  }
2824  return success;
2825  }
2826  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
2827 }
const TableDescriptor * table_desc_
Definition: Importer.h:585
virtual void checkpoint()
Definition: Importer.cpp:4335
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2764
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2829
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4063
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2890
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:584
LoadCallbackType load_callback_
Definition: Importer.h:587

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Definition at line 2571 of file Importer.cpp.

References loadImpl().

2574  {
2575  return loadImpl(import_buffers, row_count, false, session_info);
2576 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2797

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2890 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, logger::ERROR, error_msg_, TableDescriptor::fragmenter, import_export::TypedImportBuffer::get_data_block_pointers(), insert_data_, Fragmenter_Namespace::InsertData::is_default, isAddingColumns(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and TableDescriptor::tableName.

Referenced by loadImpl().

2895  {
2896  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2898  ins_data.numRows = row_count;
2899  bool success = false;
2900  try {
2901  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
2902  } catch (std::exception& e) {
2903  std::ostringstream oss;
2904  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
2905  << e.what();
2906 
2907  LOG(ERROR) << oss.str();
2908  error_msg_ = oss.str();
2909  return success;
2910  }
2911  if (isAddingColumns()) {
2912  // when Adding columns we omit any columns except the ones being added
2913  ins_data.columnIds.clear();
2914  ins_data.is_default.clear();
2915  for (auto& buffer : import_buffers) {
2916  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
2917  ins_data.is_default.push_back(true);
2918  }
2919  } else {
2920  ins_data.is_default.resize(ins_data.columnIds.size(), false);
2921  }
2922  // release loader_lock so that in InsertOrderFragmenter::insertDat
2923  // we can have multiple threads sort/shuffle InsertData
2924  loader_lock.unlock();
2925  success = true;
2926  {
2927  try {
2928  if (checkpoint) {
2929  shard_table->fragmenter->insertData(ins_data);
2930  } else {
2931  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2932  }
2933  } catch (std::exception& e) {
2934  std::ostringstream oss;
2935  oss << "Fragmenter Insert Exception when processing Table "
2936  << shard_table->tableName << " issue was " << e.what();
2937 
2938  LOG(ERROR) << oss.str();
2939  loader_lock.lock();
2940  error_msg_ = oss.str();
2941  success = false;
2942  }
2943  }
2944  return success;
2945 }
std::mutex loader_mutex_
Definition: Importer.h:616
std::string tableName
#define LOG(tag)
Definition: Logger.h:194
virtual void checkpoint()
Definition: Importer.cpp:4335
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:588
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2829
std::string error_msg_
Definition: Importer.h:617
bool isAddingColumns() const
Definition: Importer.h:563
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setAddingColumns ( const bool  adding_columns)
inline

Definition at line 562 of file Importer.h.

References adding_columns_.

562 { adding_columns_ = adding_columns; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4348 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4349  {
4350  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4351 }
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:534
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3069

+ Here is the call graph for this function:

Member Data Documentation

bool import_export::Loader::adding_columns_ = false
private

Definition at line 615 of file Importer.h.

Referenced by isAddingColumns(), and setAddingColumns().

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 584 of file Importer.h.

Referenced by distributeToShardsNewColumns(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 586 of file Importer.h.

Referenced by distributeToShardsExistingColumns(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 589 of file Importer.h.

Referenced by getStringDict(), and init().

std::string import_export::Loader::error_msg_
private

Definition at line 617 of file Importer.h.

Referenced by getErrorMessage(), and loadToShard().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 588 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 587 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 616 of file Importer.h.

Referenced by loadToShard().

const TableDescriptor* import_export::Loader::table_desc_
protected

The documentation for this class was generated from the following files: