OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setAddingColumns (const bool adding_columns)
 
bool isAddingColumns () const
 
void dropColumns (const std::vector< int > &columns)
 
std::string getErrorMessage ()
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsNewColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsExistingColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void fillShardRow (const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
 

Private Attributes

bool adding_columns_ = false
 
std::mutex loader_mutex_
 
std::string error_msg_
 

Detailed Description

Definition at line 551 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 555 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 613 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr 
)
inline

Definition at line 559 of file Importer.h.

References init().

562  : catalog_(c)
563  , table_desc_(t)
564  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
565  , load_callback_(load_callback) {
566  init();
567  }
const TableDescriptor * table_desc_
Definition: Importer.h:622
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2228
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
LoadCallbackType load_callback_
Definition: Importer.h:624

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 569 of file Importer.h.

569 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4705 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4705  {
4706  if (getTableDesc()->persistenceLevel ==
4707  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4708  // tables
4710  }
4711 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:572
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:4791
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
protected

Definition at line 2988 of file Importer.cpp.

References CHECK_GT, distributeToShardsExistingColumns(), distributeToShardsNewColumns(), isAddingColumns(), TableDescriptor::shardedColumnId, and table_desc_.

Referenced by loadImpl().

2993  {
2994  all_shard_row_counts.resize(shard_count);
2995  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2996  all_shard_import_buffers.emplace_back();
2997  for (const auto& typed_import_buffer : import_buffers) {
2998  all_shard_import_buffers.back().emplace_back(
2999  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
3000  typed_import_buffer->getStringDictionary()));
3001  }
3002  }
3004  if (isAddingColumns()) {
3005  distributeToShardsNewColumns(all_shard_import_buffers,
3006  all_shard_row_counts,
3007  import_buffers,
3008  row_count,
3009  shard_count,
3010  session_info);
3011  } else {
3012  distributeToShardsExistingColumns(all_shard_import_buffers,
3013  all_shard_row_counts,
3014  import_buffers,
3015  row_count,
3016  shard_count,
3017  session_info);
3018  }
3019 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
#define CHECK_GT(x, y)
Definition: Logger.h:234
bool isAddingColumns() const
Definition: Importer.h:600
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2929
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2967

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsExistingColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2929 of file Importer.cpp.

References CHECK, CHECK_LE, column_descs_, fillShardRow(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), kENCODING_DICT, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, and table_desc_.

Referenced by distributeToShards().

2935  {
2936  int col_idx{0};
2937  const ColumnDescriptor* shard_col_desc{nullptr};
2938  for (const auto col_desc : column_descs_) {
2939  ++col_idx;
2940  if (col_idx == table_desc_->shardedColumnId) {
2941  shard_col_desc = col_desc;
2942  break;
2943  }
2944  }
2945  CHECK(shard_col_desc);
2946  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2947  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2948  const auto& shard_col_ti = shard_col_desc->columnType;
2949  CHECK(shard_col_ti.is_integer() ||
2950  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2951  shard_col_ti.is_time());
2952  if (shard_col_ti.is_string()) {
2953  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2954  CHECK(payloads_ptr);
2955  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2956  }
2957 
2958  for (size_t i = 0; i < row_count; ++i) {
2959  const size_t shard =
2960  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2961  auto& shard_output_buffers = all_shard_import_buffers[shard];
2962  fillShardRow(i, shard_output_buffers, import_buffers);
2963  ++all_shard_row_counts[shard];
2964  }
2965 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2855
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2808
#define CHECK(condition)
Definition: Logger.h:222
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsNewColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2967 of file Importer.cpp.

References catalog_, CHECK, fillShardRow(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), and table_desc_.

Referenced by distributeToShards().

2973  {
2974  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2975  CHECK(shard_tds.size() == shard_count);
2976 
2977  for (size_t shard = 0; shard < shard_count; ++shard) {
2978  auto& shard_output_buffers = all_shard_import_buffers[shard];
2979  if (row_count != 0) {
2980  fillShardRow(0, shard_output_buffers, import_buffers);
2981  }
2982  // when replicating a column, row count of a shard == replicate count of the column
2983  // on the shard
2984  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2985  }
2986 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2855
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4630
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 3171 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

3171  {
3172  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3173  if (table_desc_->nShards) {
3175  }
3176  for (auto table_desc : table_descs) {
3177  table_desc->fragmenter->dropColumns(columnIds);
3178  }
3179 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4630
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621

+ Here is the call graph for this function:

void import_export::Loader::fillShardRow ( const size_t  row_index,
OneShardBuffers shard_output_buffers,
const OneShardBuffers import_buffers 
)
private

Definition at line 2855 of file Importer.cpp.

References CHECK, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and run_benchmark_import::type.

Referenced by distributeToShardsExistingColumns(), and distributeToShardsNewColumns().

2857  {
2858  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2859  const auto& input_buffer = import_buffers[col_idx];
2860  const auto& col_ti = input_buffer->getTypeInfo();
2861  const auto type =
2862  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2863 
2864  switch (type) {
2865  case kBOOLEAN:
2866  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2867  break;
2868  case kTINYINT:
2869  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2870  break;
2871  case kSMALLINT:
2872  shard_output_buffers[col_idx]->addSmallint(
2873  int_value_at(*input_buffer, row_index));
2874  break;
2875  case kINT:
2876  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2877  break;
2878  case kBIGINT:
2879  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2880  break;
2881  case kFLOAT:
2882  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2883  break;
2884  case kDOUBLE:
2885  shard_output_buffers[col_idx]->addDouble(
2886  double_value_at(*input_buffer, row_index));
2887  break;
2888  case kTEXT:
2889  case kVARCHAR:
2890  case kCHAR: {
2891  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2892  shard_output_buffers[col_idx]->addString(
2893  (*input_buffer->getStringBuffer())[row_index]);
2894  break;
2895  }
2896  case kTIME:
2897  case kTIMESTAMP:
2898  case kDATE:
2899  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2900  break;
2901  case kARRAY:
2902  if (IS_STRING(col_ti.get_subtype())) {
2903  CHECK(input_buffer->getStringArrayBuffer());
2904  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2905  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2906  shard_output_buffers[col_idx]->addStringArray(input_arr);
2907  } else {
2908  shard_output_buffers[col_idx]->addArray(
2909  (*input_buffer->getArrayBuffer())[row_index]);
2910  }
2911  break;
2912  case kPOINT:
2913  case kMULTIPOINT:
2914  case kLINESTRING:
2915  case kMULTILINESTRING:
2916  case kPOLYGON:
2917  case kMULTIPOLYGON: {
2918  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2919  shard_output_buffers[col_idx]->addGeoString(
2920  (*input_buffer->getGeoStringBuffer())[row_index]);
2921  break;
2922  }
2923  default:
2924  CHECK(false);
2925  }
2926  }
2927 }
Definition: sqltypes.h:63
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2846
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:499
#define CHECK_LT(x, y)
Definition: Logger.h:232
Definition: sqltypes.h:66
Definition: sqltypes.h:67
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2839
Definition: sqltypes.h:55
#define IS_STRING(T)
Definition: sqltypes.h:322
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2808
#define CHECK(condition)
Definition: Logger.h:222
Definition: sqltypes.h:59

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 573 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

573  {
574  return column_descs_;
575  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 571 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

571 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621

+ Here is the caller graph for this function:

std::string import_export::Loader::getErrorMessage ( )
inline

Definition at line 602 of file Importer.h.

References error_msg_.

602 { return error_msg_; };
std::string error_msg_
Definition: Importer.h:654
StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 577 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

577  {
578  if ((cd->columnType.get_type() != kARRAY ||
579  !IS_STRING(cd->columnType.get_subtype())) &&
580  (!cd->columnType.is_string() ||
582  return nullptr;
583  }
584  return dict_map_.at(cd->columnId);
585  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:405
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:412
#define IS_STRING(T)
Definition: sqltypes.h:322
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:600
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:626

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 572 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

572 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:622

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4713 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4713  {
4714  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4715  getTableDesc()->tableId);
4716 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:572
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3585

+ Here is the call graph for this function:

void import_export::Loader::init ( )
protected

Definition at line 3181 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

3181  {
3184  for (auto cd : column_descs_) {
3185  insert_data_.columnIds.push_back(cd->columnId);
3186  if (cd->columnType.get_compression() == kENCODING_DICT) {
3187  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3188  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3189  CHECK(dd);
3190  dict_map_[cd->columnId] = dd->stringDict.get();
3191  }
3192  }
3193  insert_data_.numRows = 0;
3194 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:625
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
#define CHECK(condition)
Definition: Logger.h:222
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:626

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::isAddingColumns ( ) const
inline

Definition at line 600 of file Importer.h.

References adding_columns_.

Referenced by distributeToShards(), and loadToShard().

600 { return adding_columns_; }

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2800 of file Importer.cpp.

References loadImpl().

2802  {
2803  return loadImpl(import_buffers, row_count, true, session_info);
2804 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3021

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
protectedvirtual

Definition at line 3021 of file Importer.cpp.

References catalog_, distributeToShards(), import_export::TypedImportBuffer::get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

3025  {
3026  if (load_callback_) {
3027  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
3028  return load_callback_(import_buffers, data_blocks, row_count);
3029  }
3030  if (table_desc_->nShards) {
3031  std::vector<OneShardBuffers> all_shard_import_buffers;
3032  std::vector<size_t> all_shard_row_counts;
3033  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
3034  distributeToShards(all_shard_import_buffers,
3035  all_shard_row_counts,
3036  import_buffers,
3037  row_count,
3038  shard_tables.size(),
3039  session_info);
3040  bool success = true;
3041  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3042  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3043  all_shard_row_counts[shard_idx],
3044  shard_tables[shard_idx],
3045  checkpoint,
3046  session_info);
3047  }
3048  return success;
3049  }
3050  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3051 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
virtual void checkpoint()
Definition: Importer.cpp:4705
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2988
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3053
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4630
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3114
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
LoadCallbackType load_callback_
Definition: Importer.h:624

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Definition at line 2793 of file Importer.cpp.

References loadImpl().

2796  {
2797  return loadImpl(import_buffers, row_count, false, session_info);
2798 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3021

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 3114 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, logger::ERROR, error_msg_, TableDescriptor::fragmenter, import_export::TypedImportBuffer::get_data_block_pointers(), insert_data_, Fragmenter_Namespace::InsertData::is_default, isAddingColumns(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and TableDescriptor::tableName.

Referenced by loadImpl().

3119  {
3120  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3122  ins_data.numRows = row_count;
3123  bool success = false;
3124  try {
3125  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3126  } catch (std::exception& e) {
3127  std::ostringstream oss;
3128  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3129  << e.what();
3130 
3131  LOG(ERROR) << oss.str();
3132  error_msg_ = oss.str();
3133  return success;
3134  }
3135  if (isAddingColumns()) {
3136  // when Adding columns we omit any columns except the ones being added
3137  ins_data.columnIds.clear();
3138  ins_data.is_default.clear();
3139  for (auto& buffer : import_buffers) {
3140  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3141  ins_data.is_default.push_back(true);
3142  }
3143  } else {
3144  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3145  }
3146  // release loader_lock so that in InsertOrderFragmenter::insertDat
3147  // we can have multiple threads sort/shuffle InsertData
3148  loader_lock.unlock();
3149  success = true;
3150  {
3151  try {
3152  if (checkpoint) {
3153  shard_table->fragmenter->insertData(ins_data);
3154  } else {
3155  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3156  }
3157  } catch (std::exception& e) {
3158  std::ostringstream oss;
3159  oss << "Fragmenter Insert Exception when processing Table "
3160  << shard_table->tableName << " issue was " << e.what();
3161 
3162  LOG(ERROR) << oss.str();
3163  loader_lock.lock();
3164  error_msg_ = oss.str();
3165  success = false;
3166  }
3167  }
3168  return success;
3169 }
std::mutex loader_mutex_
Definition: Importer.h:653
std::string tableName
#define LOG(tag)
Definition: Logger.h:216
virtual void checkpoint()
Definition: Importer.cpp:4705
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:625
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3053
std::string error_msg_
Definition: Importer.h:654
bool isAddingColumns() const
Definition: Importer.h:600
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setAddingColumns ( const bool  adding_columns)
inline

Definition at line 599 of file Importer.h.

References adding_columns_.

599 { adding_columns_ = adding_columns; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4718 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4719  {
4720  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4721 }
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3613

+ Here is the call graph for this function:

Member Data Documentation

bool import_export::Loader::adding_columns_ = false
private

Definition at line 652 of file Importer.h.

Referenced by isAddingColumns(), and setAddingColumns().

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 621 of file Importer.h.

Referenced by distributeToShardsNewColumns(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 623 of file Importer.h.

Referenced by distributeToShardsExistingColumns(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 626 of file Importer.h.

Referenced by getStringDict(), and init().

std::string import_export::Loader::error_msg_
private

Definition at line 654 of file Importer.h.

Referenced by getErrorMessage(), and loadToShard().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 625 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 624 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 653 of file Importer.h.

Referenced by loadToShard().

const TableDescriptor* import_export::Loader::table_desc_
protected

The documentation for this class was generated from the following files: