OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setAddingColumns (const bool adding_columns)
 
bool isAddingColumns () const
 
void dropColumns (const std::vector< int > &columns)
 
std::string getErrorMessage ()
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsNewColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsExistingColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void fillShardRow (const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
 

Private Attributes

bool adding_columns_ = false
 
std::mutex loader_mutex_
 
std::string error_msg_
 

Detailed Description

Definition at line 542 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 546 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 604 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr 
)
inline

Definition at line 550 of file Importer.h.

References init().

553  : catalog_(c)
554  , table_desc_(t)
555  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
556  , load_callback_(load_callback) {
557  init();
558  }
const TableDescriptor * table_desc_
Definition: Importer.h:613
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:614
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1794
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612
LoadCallbackType load_callback_
Definition: Importer.h:615

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 560 of file Importer.h.

560 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4556 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4556  {
4557  if (getTableDesc()->persistenceLevel ==
4558  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4559  // tables
4561  }
4562 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:563
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:4249
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:562

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
protected

Definition at line 2968 of file Importer.cpp.

References CHECK_GT, distributeToShardsExistingColumns(), distributeToShardsNewColumns(), isAddingColumns(), TableDescriptor::shardedColumnId, and table_desc_.

Referenced by loadImpl().

2973  {
2974  all_shard_row_counts.resize(shard_count);
2975  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2976  all_shard_import_buffers.emplace_back();
2977  for (const auto& typed_import_buffer : import_buffers) {
2978  all_shard_import_buffers.back().emplace_back(
2979  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2980  typed_import_buffer->getStringDictionary()));
2981  }
2982  }
2984  if (isAddingColumns()) {
2985  distributeToShardsNewColumns(all_shard_import_buffers,
2986  all_shard_row_counts,
2987  import_buffers,
2988  row_count,
2989  shard_count,
2990  session_info);
2991  } else {
2992  distributeToShardsExistingColumns(all_shard_import_buffers,
2993  all_shard_row_counts,
2994  import_buffers,
2995  row_count,
2996  shard_count,
2997  session_info);
2998  }
2999 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
#define CHECK_GT(x, y)
Definition: Logger.h:221
bool isAddingColumns() const
Definition: Importer.h:591
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2909
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2947

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsExistingColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2909 of file Importer.cpp.

References CHECK, CHECK_LE, column_descs_, fillShardRow(), i, import_export::anonymous_namespace{Importer.cpp}::int_value_at(), kENCODING_DICT, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, and table_desc_.

Referenced by distributeToShards().

2915  {
2916  int col_idx{0};
2917  const ColumnDescriptor* shard_col_desc{nullptr};
2918  for (const auto col_desc : column_descs_) {
2919  ++col_idx;
2920  if (col_idx == table_desc_->shardedColumnId) {
2921  shard_col_desc = col_desc;
2922  break;
2923  }
2924  }
2925  CHECK(shard_col_desc);
2926  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2927  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2928  const auto& shard_col_ti = shard_col_desc->columnType;
2929  CHECK(shard_col_ti.is_integer() ||
2930  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2931  shard_col_ti.is_time());
2932  if (shard_col_ti.is_string()) {
2933  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2934  CHECK(payloads_ptr);
2935  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2936  }
2937 
2938  for (size_t i = 0; i < row_count; ++i) {
2939  const size_t shard =
2940  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2941  auto& shard_output_buffers = all_shard_import_buffers[shard];
2942  fillShardRow(i, shard_output_buffers, import_buffers);
2943  ++all_shard_row_counts[shard];
2944  }
2945 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2837
#define CHECK_LE(x, y)
Definition: Logger.h:220
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:614
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2790
#define CHECK(condition)
Definition: Logger.h:209
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsNewColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2947 of file Importer.cpp.

References catalog_, CHECK, fillShardRow(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), and table_desc_.

Referenced by distributeToShards().

2953  {
2954  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2955  CHECK(shard_tds.size() == shard_count);
2956 
2957  for (size_t shard = 0; shard < shard_count; ++shard) {
2958  auto& shard_output_buffers = all_shard_import_buffers[shard];
2959  if (row_count != 0) {
2960  fillShardRow(0, shard_output_buffers, import_buffers);
2961  }
2962  // when replicating a column, row count of a shard == replicate count of the column
2963  // on the shard
2964  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2965  }
2966 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2837
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4104
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 3151 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

3151  {
3152  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3153  if (table_desc_->nShards) {
3155  }
3156  for (auto table_desc : table_descs) {
3157  table_desc->fragmenter->dropColumns(columnIds);
3158  }
3159 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4104
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612

+ Here is the call graph for this function:

void import_export::Loader::fillShardRow ( const size_t  row_index,
OneShardBuffers shard_output_buffers,
const OneShardBuffers import_buffers 
)
private

Definition at line 2837 of file Importer.cpp.

References CHECK, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and run_benchmark_import::type.

Referenced by distributeToShardsExistingColumns(), and distributeToShardsNewColumns().

2839  {
2840  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2841  const auto& input_buffer = import_buffers[col_idx];
2842  const auto& col_ti = input_buffer->getTypeInfo();
2843  const auto type =
2844  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2845 
2846  switch (type) {
2847  case kBOOLEAN:
2848  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2849  break;
2850  case kTINYINT:
2851  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2852  break;
2853  case kSMALLINT:
2854  shard_output_buffers[col_idx]->addSmallint(
2855  int_value_at(*input_buffer, row_index));
2856  break;
2857  case kINT:
2858  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2859  break;
2860  case kBIGINT:
2861  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2862  break;
2863  case kFLOAT:
2864  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2865  break;
2866  case kDOUBLE:
2867  shard_output_buffers[col_idx]->addDouble(
2868  double_value_at(*input_buffer, row_index));
2869  break;
2870  case kTEXT:
2871  case kVARCHAR:
2872  case kCHAR: {
2873  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2874  shard_output_buffers[col_idx]->addString(
2875  (*input_buffer->getStringBuffer())[row_index]);
2876  break;
2877  }
2878  case kTIME:
2879  case kTIMESTAMP:
2880  case kDATE:
2881  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2882  break;
2883  case kARRAY:
2884  if (IS_STRING(col_ti.get_subtype())) {
2885  CHECK(input_buffer->getStringArrayBuffer());
2886  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2887  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2888  shard_output_buffers[col_idx]->addStringArray(input_arr);
2889  } else {
2890  shard_output_buffers[col_idx]->addArray(
2891  (*input_buffer->getArrayBuffer())[row_index]);
2892  }
2893  break;
2894  case kPOINT:
2895  case kLINESTRING:
2896  case kPOLYGON:
2897  case kMULTIPOLYGON: {
2898  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2899  shard_output_buffers[col_idx]->addGeoString(
2900  (*input_buffer->getGeoStringBuffer())[row_index]);
2901  break;
2902  }
2903  default:
2904  CHECK(false);
2905  }
2906  }
2907 }
Definition: sqltypes.h:49
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2828
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:455
#define CHECK_LT(x, y)
Definition: Logger.h:219
Definition: sqltypes.h:52
Definition: sqltypes.h:53
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2821
Definition: sqltypes.h:41
#define IS_STRING(T)
Definition: sqltypes.h:250
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2790
#define CHECK(condition)
Definition: Logger.h:209
Definition: sqltypes.h:45

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 564 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

564  {
565  return column_descs_;
566  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:614

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 562 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

562 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612

+ Here is the caller graph for this function:

std::string import_export::Loader::getErrorMessage ( )
inline

Definition at line 593 of file Importer.h.

References error_msg_.

593 { return error_msg_; };
std::string error_msg_
Definition: Importer.h:645
StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 568 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

568  {
569  if ((cd->columnType.get_type() != kARRAY ||
570  !IS_STRING(cd->columnType.get_subtype())) &&
571  (!cd->columnType.is_string() ||
573  return nullptr;
574  }
575  return dict_map_.at(cd->columnId);
576  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
#define IS_STRING(T)
Definition: sqltypes.h:250
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:509
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:617

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 563 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

563 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:613

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4564 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4564  {
4565  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4566  getTableDesc()->tableId);
4567 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:563
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:562
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3089

+ Here is the call graph for this function:

void import_export::Loader::init ( )
protected

Definition at line 3161 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

3161  {
3164  for (auto cd : column_descs_) {
3165  insert_data_.columnIds.push_back(cd->columnId);
3166  if (cd->columnType.get_compression() == kENCODING_DICT) {
3167  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3168  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3169  CHECK(dd);
3170  dict_map_[cd->columnId] = dd->stringDict.get();
3171  }
3172  }
3173  insert_data_.numRows = 0;
3174 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:616
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:223
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1537
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:614
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612
#define CHECK(condition)
Definition: Logger.h:209
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:617

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::isAddingColumns ( ) const
inline

Definition at line 591 of file Importer.h.

References adding_columns_.

Referenced by distributeToShards(), and loadToShard().

591 { return adding_columns_; }

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2782 of file Importer.cpp.

References loadImpl().

2784  {
2785  return loadImpl(import_buffers, row_count, true, session_info);
2786 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3001

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
protectedvirtual

Definition at line 3001 of file Importer.cpp.

References catalog_, distributeToShards(), import_export::TypedImportBuffer::get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

3005  {
3006  if (load_callback_) {
3007  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
3008  return load_callback_(import_buffers, data_blocks, row_count);
3009  }
3010  if (table_desc_->nShards) {
3011  std::vector<OneShardBuffers> all_shard_import_buffers;
3012  std::vector<size_t> all_shard_row_counts;
3013  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
3014  distributeToShards(all_shard_import_buffers,
3015  all_shard_row_counts,
3016  import_buffers,
3017  row_count,
3018  shard_tables.size(),
3019  session_info);
3020  bool success = true;
3021  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3022  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3023  all_shard_row_counts[shard_idx],
3024  shard_tables[shard_idx],
3025  checkpoint,
3026  session_info);
3027  }
3028  return success;
3029  }
3030  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3031 }
const TableDescriptor * table_desc_
Definition: Importer.h:613
virtual void checkpoint()
Definition: Importer.cpp:4556
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2968
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3033
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4104
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3094
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:612
LoadCallbackType load_callback_
Definition: Importer.h:615

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Definition at line 2775 of file Importer.cpp.

References loadImpl().

2778  {
2779  return loadImpl(import_buffers, row_count, false, session_info);
2780 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3001

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 3094 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, logger::ERROR, error_msg_, TableDescriptor::fragmenter, import_export::TypedImportBuffer::get_data_block_pointers(), insert_data_, Fragmenter_Namespace::InsertData::is_default, isAddingColumns(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and TableDescriptor::tableName.

Referenced by loadImpl().

3099  {
3100  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3102  ins_data.numRows = row_count;
3103  bool success = false;
3104  try {
3105  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3106  } catch (std::exception& e) {
3107  std::ostringstream oss;
3108  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3109  << e.what();
3110 
3111  LOG(ERROR) << oss.str();
3112  error_msg_ = oss.str();
3113  return success;
3114  }
3115  if (isAddingColumns()) {
3116  // when Adding columns we omit any columns except the ones being added
3117  ins_data.columnIds.clear();
3118  ins_data.is_default.clear();
3119  for (auto& buffer : import_buffers) {
3120  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3121  ins_data.is_default.push_back(true);
3122  }
3123  } else {
3124  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3125  }
3126  // release loader_lock so that in InsertOrderFragmenter::insertDat
3127  // we can have multiple threads sort/shuffle InsertData
3128  loader_lock.unlock();
3129  success = true;
3130  {
3131  try {
3132  if (checkpoint) {
3133  shard_table->fragmenter->insertData(ins_data);
3134  } else {
3135  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3136  }
3137  } catch (std::exception& e) {
3138  std::ostringstream oss;
3139  oss << "Fragmenter Insert Exception when processing Table "
3140  << shard_table->tableName << " issue was " << e.what();
3141 
3142  LOG(ERROR) << oss.str();
3143  loader_lock.lock();
3144  error_msg_ = oss.str();
3145  success = false;
3146  }
3147  }
3148  return success;
3149 }
std::mutex loader_mutex_
Definition: Importer.h:644
std::string tableName
#define LOG(tag)
Definition: Logger.h:203
virtual void checkpoint()
Definition: Importer.cpp:4556
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:616
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3033
std::string error_msg_
Definition: Importer.h:645
bool isAddingColumns() const
Definition: Importer.h:591
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setAddingColumns ( const bool  adding_columns)
inline

Definition at line 590 of file Importer.h.

References adding_columns_.

590 { adding_columns_ = adding_columns; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4569 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4570  {
4571  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4572 }
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:562
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3117

+ Here is the call graph for this function:

Member Data Documentation

bool import_export::Loader::adding_columns_ = false
private

Definition at line 643 of file Importer.h.

Referenced by isAddingColumns(), and setAddingColumns().

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 612 of file Importer.h.

Referenced by distributeToShardsNewColumns(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 614 of file Importer.h.

Referenced by distributeToShardsExistingColumns(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 617 of file Importer.h.

Referenced by getStringDict(), and init().

std::string import_export::Loader::error_msg_
private

Definition at line 645 of file Importer.h.

Referenced by getErrorMessage(), and loadToShard().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 616 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 615 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 644 of file Importer.h.

Referenced by loadToShard().

const TableDescriptor* import_export::Loader::table_desc_
protected

The documentation for this class was generated from the following files: