OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setAddingColumns (const bool adding_columns)
 
bool isAddingColumns () const
 
void dropColumns (const std::vector< int > &columns)
 
std::string getErrorMessage ()
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsNewColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsExistingColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void fillShardRow (const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
 

Private Attributes

bool adding_columns_ = false
 
std::mutex loader_mutex_
 
std::string error_msg_
 

Detailed Description

Definition at line 551 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 555 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 613 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr 
)
inline

Definition at line 559 of file Importer.h.

References init().

562  : catalog_(c)
563  , table_desc_(t)
564  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
565  , load_callback_(load_callback) {
566  init();
567  }
const TableDescriptor * table_desc_
Definition: Importer.h:622
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2267
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
LoadCallbackType load_callback_
Definition: Importer.h:624

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 569 of file Importer.h.

569 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4666 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4666  {
4667  if (getTableDesc()->persistenceLevel ==
4668  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4669  // tables
4671  }
4672 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:572
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:4861
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
protected

Definition at line 2949 of file Importer.cpp.

References CHECK_GT, distributeToShardsExistingColumns(), distributeToShardsNewColumns(), isAddingColumns(), TableDescriptor::shardedColumnId, and table_desc_.

Referenced by loadImpl().

2954  {
2955  all_shard_row_counts.resize(shard_count);
2956  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2957  all_shard_import_buffers.emplace_back();
2958  for (const auto& typed_import_buffer : import_buffers) {
2959  all_shard_import_buffers.back().emplace_back(
2960  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2961  typed_import_buffer->getStringDictionary()));
2962  }
2963  }
2965  if (isAddingColumns()) {
2966  distributeToShardsNewColumns(all_shard_import_buffers,
2967  all_shard_row_counts,
2968  import_buffers,
2969  row_count,
2970  shard_count,
2971  session_info);
2972  } else {
2973  distributeToShardsExistingColumns(all_shard_import_buffers,
2974  all_shard_row_counts,
2975  import_buffers,
2976  row_count,
2977  shard_count,
2978  session_info);
2979  }
2980 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool isAddingColumns() const
Definition: Importer.h:600
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2890
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2928

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsExistingColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2890 of file Importer.cpp.

References CHECK, CHECK_LE, column_descs_, fillShardRow(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), kENCODING_DICT, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, and table_desc_.

Referenced by distributeToShards().

2896  {
2897  int col_idx{0};
2898  const ColumnDescriptor* shard_col_desc{nullptr};
2899  for (const auto col_desc : column_descs_) {
2900  ++col_idx;
2901  if (col_idx == table_desc_->shardedColumnId) {
2902  shard_col_desc = col_desc;
2903  break;
2904  }
2905  }
2906  CHECK(shard_col_desc);
2907  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2908  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2909  const auto& shard_col_ti = shard_col_desc->columnType;
2910  CHECK(shard_col_ti.is_integer() ||
2911  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2912  shard_col_ti.is_time());
2913  if (shard_col_ti.is_string()) {
2914  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2915  CHECK(payloads_ptr);
2916  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2917  }
2918 
2919  for (size_t i = 0; i < row_count; ++i) {
2920  const size_t shard =
2921  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2922  auto& shard_output_buffers = all_shard_import_buffers[shard];
2923  fillShardRow(i, shard_output_buffers, import_buffers);
2924  ++all_shard_row_counts[shard];
2925  }
2926 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2816
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2769
#define CHECK(condition)
Definition: Logger.h:291
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsNewColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2928 of file Importer.cpp.

References catalog_, CHECK, fillShardRow(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), and table_desc_.

Referenced by distributeToShards().

2934  {
2935  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2936  CHECK(shard_tds.size() == shard_count);
2937 
2938  for (size_t shard = 0; shard < shard_count; ++shard) {
2939  auto& shard_output_buffers = all_shard_import_buffers[shard];
2940  if (row_count != 0) {
2941  fillShardRow(0, shard_output_buffers, import_buffers);
2942  }
2943  // when replicating a column, row count of a shard == replicate count of the column
2944  // on the shard
2945  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2946  }
2947 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2816
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4700
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 3132 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

3132  {
3133  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3134  if (table_desc_->nShards) {
3136  }
3137  for (auto table_desc : table_descs) {
3138  table_desc->fragmenter->dropColumns(columnIds);
3139  }
3140 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4700
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621

+ Here is the call graph for this function:

void import_export::Loader::fillShardRow ( const size_t  row_index,
OneShardBuffers shard_output_buffers,
const OneShardBuffers import_buffers 
)
private

Definition at line 2816 of file Importer.cpp.

References CHECK, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and run_benchmark_import::type.

Referenced by distributeToShardsExistingColumns(), and distributeToShardsNewColumns().

2818  {
2819  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2820  const auto& input_buffer = import_buffers[col_idx];
2821  const auto& col_ti = input_buffer->getTypeInfo();
2822  const auto type =
2823  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2824 
2825  switch (type) {
2826  case kBOOLEAN:
2827  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2828  break;
2829  case kTINYINT:
2830  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2831  break;
2832  case kSMALLINT:
2833  shard_output_buffers[col_idx]->addSmallint(
2834  int_value_at(*input_buffer, row_index));
2835  break;
2836  case kINT:
2837  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2838  break;
2839  case kBIGINT:
2840  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2841  break;
2842  case kFLOAT:
2843  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2844  break;
2845  case kDOUBLE:
2846  shard_output_buffers[col_idx]->addDouble(
2847  double_value_at(*input_buffer, row_index));
2848  break;
2849  case kTEXT:
2850  case kVARCHAR:
2851  case kCHAR: {
2852  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2853  shard_output_buffers[col_idx]->addString(
2854  (*input_buffer->getStringBuffer())[row_index]);
2855  break;
2856  }
2857  case kTIME:
2858  case kTIMESTAMP:
2859  case kDATE:
2860  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2861  break;
2862  case kARRAY:
2863  if (IS_STRING(col_ti.get_subtype())) {
2864  CHECK(input_buffer->getStringArrayBuffer());
2865  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2866  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2867  shard_output_buffers[col_idx]->addStringArray(input_arr);
2868  } else {
2869  shard_output_buffers[col_idx]->addArray(
2870  (*input_buffer->getArrayBuffer())[row_index]);
2871  }
2872  break;
2873  case kPOINT:
2874  case kMULTIPOINT:
2875  case kLINESTRING:
2876  case kMULTILINESTRING:
2877  case kPOLYGON:
2878  case kMULTIPOLYGON: {
2879  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2880  shard_output_buffers[col_idx]->addGeoString(
2881  (*input_buffer->getGeoStringBuffer())[row_index]);
2882  break;
2883  }
2884  default:
2885  CHECK(false);
2886  }
2887  }
2888 }
Definition: sqltypes.h:66
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2807
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:559
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:69
Definition: sqltypes.h:70
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2800
Definition: sqltypes.h:58
#define IS_STRING(T)
Definition: sqltypes.h:299
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2769
#define CHECK(condition)
Definition: Logger.h:291
Definition: sqltypes.h:62

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 573 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

573  {
574  return column_descs_;
575  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 571 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

571 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621

+ Here is the caller graph for this function:

std::string import_export::Loader::getErrorMessage ( )
inline

Definition at line 602 of file Importer.h.

References error_msg_.

602 { return error_msg_; };
std::string error_msg_
Definition: Importer.h:654
StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 577 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

577  {
578  if ((cd->columnType.get_type() != kARRAY ||
579  !IS_STRING(cd->columnType.get_subtype())) &&
580  (!cd->columnType.is_string() ||
582  return nullptr;
583  }
584  return dict_map_.at(cd->columnId);
585  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:382
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:389
#define IS_STRING(T)
Definition: sqltypes.h:299
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:580
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:626

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 572 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

572 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:622

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4674 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4674  {
4675  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4676  getTableDesc()->tableId);
4677 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:572
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3630

+ Here is the call graph for this function:

void import_export::Loader::init ( )
protected

Definition at line 3142 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

3142  {
3145  for (auto cd : column_descs_) {
3146  insert_data_.columnIds.push_back(cd->columnId);
3147  if (cd->columnType.get_compression() == kENCODING_DICT) {
3148  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3149  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3150  CHECK(dd);
3151  dict_map_[cd->columnId] = dd->stringDict.get();
3152  }
3153  }
3154  insert_data_.numRows = 0;
3155 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:625
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:248
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1999
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:623
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:626

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::isAddingColumns ( ) const
inline

Definition at line 600 of file Importer.h.

References adding_columns_.

Referenced by distributeToShards(), and loadToShard().

600 { return adding_columns_; }

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2761 of file Importer.cpp.

References loadImpl().

2763  {
2764  return loadImpl(import_buffers, row_count, true, session_info);
2765 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2982

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
protectedvirtual

Definition at line 2982 of file Importer.cpp.

References catalog_, distributeToShards(), import_export::TypedImportBuffer::get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2986  {
2987  if (load_callback_) {
2988  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2989  return load_callback_(import_buffers, data_blocks, row_count);
2990  }
2991  if (table_desc_->nShards) {
2992  std::vector<OneShardBuffers> all_shard_import_buffers;
2993  std::vector<size_t> all_shard_row_counts;
2994  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2995  distributeToShards(all_shard_import_buffers,
2996  all_shard_row_counts,
2997  import_buffers,
2998  row_count,
2999  shard_tables.size(),
3000  session_info);
3001  bool success = true;
3002  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3003  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3004  all_shard_row_counts[shard_idx],
3005  shard_tables[shard_idx],
3006  checkpoint,
3007  session_info);
3008  }
3009  return success;
3010  }
3011  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3012 }
const TableDescriptor * table_desc_
Definition: Importer.h:622
virtual void checkpoint()
Definition: Importer.cpp:4666
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2949
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3014
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4700
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3075
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:621
LoadCallbackType load_callback_
Definition: Importer.h:624

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Definition at line 2754 of file Importer.cpp.

References loadImpl().

2757  {
2758  return loadImpl(import_buffers, row_count, false, session_info);
2759 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2982

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 3075 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, logger::ERROR, error_msg_, TableDescriptor::fragmenter, import_export::TypedImportBuffer::get_data_block_pointers(), insert_data_, Fragmenter_Namespace::InsertData::is_default, isAddingColumns(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and TableDescriptor::tableName.

Referenced by loadImpl().

3080  {
3081  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3083  ins_data.numRows = row_count;
3084  bool success = false;
3085  try {
3086  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3087  } catch (std::exception& e) {
3088  std::ostringstream oss;
3089  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3090  << e.what();
3091 
3092  LOG(ERROR) << oss.str();
3093  error_msg_ = oss.str();
3094  return success;
3095  }
3096  if (isAddingColumns()) {
3097  // when Adding columns we omit any columns except the ones being added
3098  ins_data.columnIds.clear();
3099  ins_data.is_default.clear();
3100  for (auto& buffer : import_buffers) {
3101  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3102  ins_data.is_default.push_back(true);
3103  }
3104  } else {
3105  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3106  }
3107  // release loader_lock so that in InsertOrderFragmenter::insertDat
3108  // we can have multiple threads sort/shuffle InsertData
3109  loader_lock.unlock();
3110  success = true;
3111  {
3112  try {
3113  if (checkpoint) {
3114  shard_table->fragmenter->insertData(ins_data);
3115  } else {
3116  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3117  }
3118  } catch (std::exception& e) {
3119  std::ostringstream oss;
3120  oss << "Fragmenter Insert Exception when processing Table "
3121  << shard_table->tableName << " issue was " << e.what();
3122 
3123  LOG(ERROR) << oss.str();
3124  loader_lock.lock();
3125  error_msg_ = oss.str();
3126  success = false;
3127  }
3128  }
3129  return success;
3130 }
std::mutex loader_mutex_
Definition: Importer.h:653
std::string tableName
#define LOG(tag)
Definition: Logger.h:285
virtual void checkpoint()
Definition: Importer.cpp:4666
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:625
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:3014
std::string error_msg_
Definition: Importer.h:654
bool isAddingColumns() const
Definition: Importer.h:600
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setAddingColumns ( const bool  adding_columns)
inline

Definition at line 599 of file Importer.h.

References adding_columns_.

599 { adding_columns_ = adding_columns; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4679 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4680  {
4681  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4682 }
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:571
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3658

+ Here is the call graph for this function:

Member Data Documentation

bool import_export::Loader::adding_columns_ = false
private

Definition at line 652 of file Importer.h.

Referenced by isAddingColumns(), and setAddingColumns().

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 621 of file Importer.h.

Referenced by distributeToShardsNewColumns(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 623 of file Importer.h.

Referenced by distributeToShardsExistingColumns(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 626 of file Importer.h.

Referenced by getStringDict(), and init().

std::string import_export::Loader::error_msg_
private

Definition at line 654 of file Importer.h.

Referenced by getErrorMessage(), and loadToShard().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 625 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 624 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 653 of file Importer.h.

Referenced by loadToShard().

const TableDescriptor* import_export::Loader::table_desc_
protected

The documentation for this class was generated from the following files: