OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog () const
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
virtual void checkpoint ()
 
virtual std::vector
< Catalog_Namespace::TableEpochInfo
getTableEpochs () const
 
virtual void setTableEpochs (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
void setAddingColumns (const bool adding_columns)
 
bool isAddingColumns () const
 
void dropColumns (const std::vector< int > &columns)
 
std::string getErrorMessage ()
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsNewColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void distributeToShardsExistingColumns (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
 
void fillShardRow (const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
 

Private Attributes

bool adding_columns_ = false
 
std::mutex loader_mutex_
 
std::string error_msg_
 

Detailed Description

Definition at line 557 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 561 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 619 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr 
)
inline

Definition at line 565 of file Importer.h.

References init().

568  : catalog_(c)
569  , table_desc_(t)
570  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
571  , load_callback_(load_callback) {
572  init();
573  }
const TableDescriptor * table_desc_
Definition: Importer.h:628
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:629
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2172
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627
LoadCallbackType load_callback_
Definition: Importer.h:630

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 575 of file Importer.h.

575 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4564 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpointWithAutoRollback(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4564  {
4565  if (getTableDesc()->persistenceLevel ==
4566  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4567  // tables
4569  }
4570 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:578
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:5030
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:577

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
protected

Definition at line 2872 of file Importer.cpp.

References CHECK_GT, distributeToShardsExistingColumns(), distributeToShardsNewColumns(), isAddingColumns(), TableDescriptor::shardedColumnId, and table_desc_.

Referenced by loadImpl().

2877  {
2878  all_shard_row_counts.resize(shard_count);
2879  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2880  all_shard_import_buffers.emplace_back();
2881  for (const auto& typed_import_buffer : import_buffers) {
2882  all_shard_import_buffers.back().emplace_back(
2883  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2884  typed_import_buffer->getStringDictionary()));
2885  }
2886  }
2888  if (isAddingColumns()) {
2889  distributeToShardsNewColumns(all_shard_import_buffers,
2890  all_shard_row_counts,
2891  import_buffers,
2892  row_count,
2893  shard_count,
2894  session_info);
2895  } else {
2896  distributeToShardsExistingColumns(all_shard_import_buffers,
2897  all_shard_row_counts,
2898  import_buffers,
2899  row_count,
2900  shard_count,
2901  session_info);
2902  }
2903 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool isAddingColumns() const
Definition: Importer.h:606
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2813
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2851

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsExistingColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2813 of file Importer.cpp.

References CHECK, CHECK_LE, column_descs_, fillShardRow(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), kENCODING_DICT, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, and table_desc_.

Referenced by distributeToShards().

2819  {
2820  int col_idx{0};
2821  const ColumnDescriptor* shard_col_desc{nullptr};
2822  for (const auto col_desc : column_descs_) {
2823  ++col_idx;
2824  if (col_idx == table_desc_->shardedColumnId) {
2825  shard_col_desc = col_desc;
2826  break;
2827  }
2828  }
2829  CHECK(shard_col_desc);
2830  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2831  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2832  const auto& shard_col_ti = shard_col_desc->columnType;
2833  CHECK(shard_col_ti.is_integer() ||
2834  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2835  shard_col_ti.is_time());
2836  if (shard_col_ti.is_string()) {
2837  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2838  CHECK(payloads_ptr);
2839  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2840  }
2841 
2842  for (size_t i = 0; i < row_count; ++i) {
2843  const size_t shard =
2844  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2845  auto& shard_output_buffers = all_shard_import_buffers[shard];
2846  fillShardRow(i, shard_output_buffers, import_buffers);
2847  ++all_shard_row_counts[shard];
2848  }
2849 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2739
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:629
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2692
#define CHECK(condition)
Definition: Logger.h:291
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::distributeToShardsNewColumns ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2851 of file Importer.cpp.

References catalog_, CHECK, fillShardRow(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), and table_desc_.

Referenced by distributeToShards().

2857  {
2858  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2859  CHECK(shard_tds.size() == shard_count);
2860 
2861  for (size_t shard = 0; shard < shard_count; ++shard) {
2862  auto& shard_output_buffers = all_shard_import_buffers[shard];
2863  if (row_count != 0) {
2864  fillShardRow(0, shard_output_buffers, import_buffers);
2865  }
2866  // when replicating a column, row count of a shard == replicate count of the column
2867  // on the shard
2868  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2869  }
2870 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2739
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4869
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 3055 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

3055  {
3056  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3057  if (table_desc_->nShards) {
3059  }
3060  for (auto table_desc : table_descs) {
3061  table_desc->fragmenter->dropColumns(columnIds);
3062  }
3063 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4869
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627

+ Here is the call graph for this function:

void import_export::Loader::fillShardRow ( const size_t  row_index,
OneShardBuffers shard_output_buffers,
const OneShardBuffers import_buffers 
)
private

Definition at line 2739 of file Importer.cpp.

References CHECK, CHECK_LT, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and run_benchmark_import::type.

Referenced by distributeToShardsExistingColumns(), and distributeToShardsNewColumns().

2741  {
2742  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2743  const auto& input_buffer = import_buffers[col_idx];
2744  const auto& col_ti = input_buffer->getTypeInfo();
2745  const auto type =
2746  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2747 
2748  switch (type) {
2749  case kBOOLEAN:
2750  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2751  break;
2752  case kTINYINT:
2753  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2754  break;
2755  case kSMALLINT:
2756  shard_output_buffers[col_idx]->addSmallint(
2757  int_value_at(*input_buffer, row_index));
2758  break;
2759  case kINT:
2760  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2761  break;
2762  case kBIGINT:
2763  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2764  break;
2765  case kFLOAT:
2766  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2767  break;
2768  case kDOUBLE:
2769  shard_output_buffers[col_idx]->addDouble(
2770  double_value_at(*input_buffer, row_index));
2771  break;
2772  case kTEXT:
2773  case kVARCHAR:
2774  case kCHAR: {
2775  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2776  shard_output_buffers[col_idx]->addString(
2777  (*input_buffer->getStringBuffer())[row_index]);
2778  break;
2779  }
2780  case kTIME:
2781  case kTIMESTAMP:
2782  case kDATE:
2783  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2784  break;
2785  case kARRAY:
2786  if (IS_STRING(col_ti.get_subtype())) {
2787  CHECK(input_buffer->getStringArrayBuffer());
2788  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2789  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2790  shard_output_buffers[col_idx]->addStringArray(input_arr);
2791  } else {
2792  shard_output_buffers[col_idx]->addArray(
2793  (*input_buffer->getArrayBuffer())[row_index]);
2794  }
2795  break;
2796  case kPOINT:
2797  case kMULTIPOINT:
2798  case kLINESTRING:
2799  case kMULTILINESTRING:
2800  case kPOLYGON:
2801  case kMULTIPOLYGON: {
2802  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2803  shard_output_buffers[col_idx]->addGeoString(
2804  (*input_buffer->getGeoStringBuffer())[row_index]);
2805  break;
2806  }
2807  default:
2808  CHECK(false);
2809  }
2810  }
2811 }
Definition: sqltypes.h:76
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2730
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:561
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:79
Definition: sqltypes.h:80
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2723
Definition: sqltypes.h:68
#define IS_STRING(T)
Definition: sqltypes.h:309
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2692
#define CHECK(condition)
Definition: Logger.h:291
Definition: sqltypes.h:72

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 579 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

579  {
580  return column_descs_;
581  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:629

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( ) const
inline

Definition at line 577 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpochs(), and setTableEpochs().

577 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627

+ Here is the caller graph for this function:

std::string import_export::Loader::getErrorMessage ( )
inline

Definition at line 608 of file Importer.h.

References error_msg_.

608 { return error_msg_; };
std::string error_msg_
Definition: Importer.h:660
StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 583 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by import_export::setup_column_loaders().

583  {
584  if ((cd->columnType.get_type() != kARRAY ||
585  !IS_STRING(cd->columnType.get_subtype())) &&
586  (!cd->columnType.is_string() ||
588  return nullptr;
589  }
590  return dict_map_.at(cd->columnId);
591  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
#define IS_STRING(T)
Definition: sqltypes.h:309
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:559
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:632

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 578 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), and getTableEpochs().

578 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:628

+ Here is the caller graph for this function:

std::vector< Catalog_Namespace::TableEpochInfo > import_export::Loader::getTableEpochs ( ) const
virtual

Definition at line 4572 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpochs().

4572  {
4573  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4574  getTableDesc()->tableId);
4575 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:578
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:577
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3821

+ Here is the call graph for this function:

void import_export::Loader::init ( )
protected

Definition at line 3065 of file Importer.cpp.

References catalog_, CHECK, column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

3065  {
3068  for (auto cd : column_descs_) {
3069  insert_data_.columnIds.push_back(cd->columnId);
3070  if (cd->columnType.get_compression() == kENCODING_DICT) {
3071  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3072  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3073  CHECK(dd);
3074  dict_map_[cd->columnId] = dd->stringDict.get();
3075  }
3076  }
3077  insert_data_.numRows = 0;
3078 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:631
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1904
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:629
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:632

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::isAddingColumns ( ) const
inline

Definition at line 606 of file Importer.h.

References adding_columns_.

Referenced by distributeToShards(), and loadToShard().

606 { return adding_columns_; }

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2684 of file Importer.cpp.

References loadImpl().

2686  {
2687  return loadImpl(import_buffers, row_count, true, session_info);
2688 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2905

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
protectedvirtual

Definition at line 2905 of file Importer.cpp.

References catalog_, distributeToShards(), import_export::TypedImportBuffer::get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2909  {
2910  if (load_callback_) {
2911  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2912  return load_callback_(import_buffers, data_blocks, row_count);
2913  }
2914  if (table_desc_->nShards) {
2915  std::vector<OneShardBuffers> all_shard_import_buffers;
2916  std::vector<size_t> all_shard_row_counts;
2917  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2918  distributeToShards(all_shard_import_buffers,
2919  all_shard_row_counts,
2920  import_buffers,
2921  row_count,
2922  shard_tables.size(),
2923  session_info);
2924  bool success = true;
2925  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2926  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2927  all_shard_row_counts[shard_idx],
2928  shard_tables[shard_idx],
2929  checkpoint,
2930  session_info);
2931  }
2932  return success;
2933  }
2934  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
2935 }
const TableDescriptor * table_desc_
Definition: Importer.h:628
virtual void checkpoint()
Definition: Importer.cpp:4564
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2872
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2937
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4869
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2998
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627
LoadCallbackType load_callback_
Definition: Importer.h:630

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)
virtual

Definition at line 2677 of file Importer.cpp.

References loadImpl().

2680  {
2681  return loadImpl(import_buffers, row_count, false, session_info);
2682 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2905

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 2998 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, logger::ERROR, error_msg_, TableDescriptor::fragmenter, import_export::TypedImportBuffer::get_data_block_pointers(), insert_data_, Fragmenter_Namespace::InsertData::is_default, isAddingColumns(), loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and TableDescriptor::tableName.

Referenced by loadImpl().

3003  {
3004  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3006  ins_data.numRows = row_count;
3007  bool success = false;
3008  try {
3009  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3010  } catch (std::exception& e) {
3011  std::ostringstream oss;
3012  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3013  << e.what();
3014 
3015  LOG(ERROR) << oss.str();
3016  error_msg_ = oss.str();
3017  return success;
3018  }
3019  if (isAddingColumns()) {
3020  // when Adding columns we omit any columns except the ones being added
3021  ins_data.columnIds.clear();
3022  ins_data.is_default.clear();
3023  for (auto& buffer : import_buffers) {
3024  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3025  ins_data.is_default.push_back(true);
3026  }
3027  } else {
3028  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3029  }
3030  // release loader_lock so that in InsertOrderFragmenter::insertDat
3031  // we can have multiple threads sort/shuffle InsertData
3032  loader_lock.unlock();
3033  success = true;
3034  {
3035  try {
3036  if (checkpoint) {
3037  shard_table->fragmenter->insertData(ins_data);
3038  } else {
3039  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3040  }
3041  } catch (std::exception& e) {
3042  std::ostringstream oss;
3043  oss << "Fragmenter Insert Exception when processing Table "
3044  << shard_table->tableName << " issue was " << e.what();
3045 
3046  LOG(ERROR) << oss.str();
3047  loader_lock.lock();
3048  error_msg_ = oss.str();
3049  success = false;
3050  }
3051  }
3052  return success;
3053 }
std::mutex loader_mutex_
Definition: Importer.h:659
std::string tableName
#define LOG(tag)
Definition: Logger.h:285
virtual void checkpoint()
Definition: Importer.cpp:4564
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:631
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2937
std::string error_msg_
Definition: Importer.h:660
bool isAddingColumns() const
Definition: Importer.h:606
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setAddingColumns ( const bool  adding_columns)
inline

Definition at line 605 of file Importer.h.

References adding_columns_.

605 { adding_columns_ = adding_columns; }
void import_export::Loader::setTableEpochs ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)
virtual

Definition at line 4577 of file Importer.cpp.

References getCatalog(), and Catalog_Namespace::Catalog::setTableEpochs().

4578  {
4579  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4580 }
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:577
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3849

+ Here is the call graph for this function:

Member Data Documentation

bool import_export::Loader::adding_columns_ = false
private

Definition at line 658 of file Importer.h.

Referenced by isAddingColumns(), and setAddingColumns().

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 627 of file Importer.h.

Referenced by distributeToShardsNewColumns(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 629 of file Importer.h.

Referenced by distributeToShardsExistingColumns(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 632 of file Importer.h.

Referenced by getStringDict(), and init().

std::string import_export::Loader::error_msg_
private

Definition at line 660 of file Importer.h.

Referenced by getErrorMessage(), and loadToShard().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 631 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 630 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 659 of file Importer.h.

Referenced by loadToShard().

const TableDescriptor* import_export::Loader::table_desc_
protected

The documentation for this class was generated from the following files: