OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Loader:
+ Collaboration diagram for import_export::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr, bool use_catalog_locks=true)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog ()
 
const TableDescriptorgetTableDesc () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual int32_t getTableEpoch ()
 
virtual void setTableEpoch (const int32_t new_epoch)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 
void dropColumns (const std::vector< int > &columns)
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer >>
 

Protected Member Functions

void init (const bool use_catalog_locks)
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const
ColumnDescriptor * > 
column_descs_
 
LoadCallbackType load_callback_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Types

using LoadCallbackType = std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)>
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 514 of file Importer.h.

Member Typedef Documentation

using import_export::Loader::LoadCallbackType = std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&, std::vector<DataBlockPtr>&, size_t)>
private

Definition at line 518 of file Importer.h.

using import_export::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>
protected

Definition at line 576 of file Importer.h.

Constructor & Destructor Documentation

import_export::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
LoadCallbackType  load_callback = nullptr,
bool  use_catalog_locks = true 
)
inline

Definition at line 523 of file Importer.h.

References init().

527  : catalog_(c)
528  , table_desc_(t)
529  , column_descs_(
530  use_catalog_locks
531  ? c.getAllColumnMetadataForTable(t->tableId, false, false, true)
532  : c.getAllColumnMetadataForTableUnlocked(t->tableId, false, false, true))
533  , load_callback_(load_callback) {
534  init(use_catalog_locks);
535  }
std::list< const ColumnDescriptor * > getAllColumnMetadataForTableUnlocked(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Definition: Catalog.cpp:1714
const TableDescriptor * table_desc_
Definition: Importer.h:584
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2855
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:585
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1704
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583
LoadCallbackType load_callback_
Definition: Importer.h:586

+ Here is the call graph for this function:

virtual import_export::Loader::~Loader ( )
inlinevirtual

Definition at line 537 of file Importer.h.

537 {}

Member Function Documentation

void import_export::Loader::checkpoint ( )
virtual

Definition at line 4153 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpoint(), Data_Namespace::DISK_LEVEL, getCatalog(), and getTableDesc().

4153  {
4154  if (getTableDesc()->persistenceLevel ==
4155  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4156  getCatalog().checkpoint(getTableDesc()->tableId);
4157  }
4158 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:3598
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:539

+ Here is the call graph for this function:

void import_export::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2562 of file Importer.cpp.

References catalog_, CHECK(), CHECK_GT, CHECK_LE, CHECK_LT, column_descs_, decimal_to_int_type(), import_export::anonymous_namespace{Importer.cpp}::double_value_at(), import_export::anonymous_namespace{Importer.cpp}::float_value_at(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), getReplicating(), import_export::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, TableDescriptor::nShards, SHARD_FOR_KEY, TableDescriptor::shardedColumnId, table_desc_, and run_benchmark_import::type.

Referenced by loadImpl().

2566  {
2567  all_shard_row_counts.resize(shard_count);
2568  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2569  all_shard_import_buffers.emplace_back();
2570  for (const auto& typed_import_buffer : import_buffers) {
2571  all_shard_import_buffers.back().emplace_back(
2572  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2573  typed_import_buffer->getStringDictionary()));
2574  }
2575  }
2577  int col_idx{0};
2578  const ColumnDescriptor* shard_col_desc{nullptr};
2579  for (const auto col_desc : column_descs_) {
2580  ++col_idx;
2581  if (col_idx == table_desc_->shardedColumnId) {
2582  shard_col_desc = col_desc;
2583  break;
2584  }
2585  }
2586  CHECK(shard_col_desc);
2587  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2588  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2589  const auto& shard_col_ti = shard_col_desc->columnType;
2590  CHECK(shard_col_ti.is_integer() ||
2591  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2592  shard_col_ti.is_time());
2593  if (shard_col_ti.is_string()) {
2594  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2595  CHECK(payloads_ptr);
2596  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2597  }
2598 
2599  // for each replicated (alter added) columns, number of rows in a shard is
2600  // inferred from that of the sharding column, not simply evenly distributed.
2601  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2602  // Here the loop count is overloaded. For normal imports, we loop thru all
2603  // input values (rows), so the loop count is the number of input rows.
2604  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2605  // all shards, so the loop count is the number of shards.
2606  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2607  for (size_t i = 0; i < loop_count; ++i) {
2608  const size_t shard =
2609  getReplicating()
2610  ? i
2611  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2612  auto& shard_output_buffers = all_shard_import_buffers[shard];
2613 
2614  // when replicate a column, populate 'rows' to all shards only once
2615  // and its value is fetch from the first and the single row
2616  const auto row_index = getReplicating() ? 0 : i;
2617 
2618  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2619  const auto& input_buffer = import_buffers[col_idx];
2620  const auto& col_ti = input_buffer->getTypeInfo();
2621  const auto type =
2622  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2623 
2624  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2625  // count. and, bypass non-replicated column.
2626  if (getReplicating()) {
2627  if (input_buffer->get_replicate_count() > 0) {
2628  shard_output_buffers[col_idx]->set_replicate_count(
2629  shard_tds[shard]->fragmenter->getNumRows());
2630  } else {
2631  continue;
2632  }
2633  }
2634 
2635  switch (type) {
2636  case kBOOLEAN:
2637  shard_output_buffers[col_idx]->addBoolean(
2638  int_value_at(*input_buffer, row_index));
2639  break;
2640  case kTINYINT:
2641  shard_output_buffers[col_idx]->addTinyint(
2642  int_value_at(*input_buffer, row_index));
2643  break;
2644  case kSMALLINT:
2645  shard_output_buffers[col_idx]->addSmallint(
2646  int_value_at(*input_buffer, row_index));
2647  break;
2648  case kINT:
2649  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2650  break;
2651  case kBIGINT:
2652  shard_output_buffers[col_idx]->addBigint(
2653  int_value_at(*input_buffer, row_index));
2654  break;
2655  case kFLOAT:
2656  shard_output_buffers[col_idx]->addFloat(
2657  float_value_at(*input_buffer, row_index));
2658  break;
2659  case kDOUBLE:
2660  shard_output_buffers[col_idx]->addDouble(
2661  double_value_at(*input_buffer, row_index));
2662  break;
2663  case kTEXT:
2664  case kVARCHAR:
2665  case kCHAR: {
2666  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2667  shard_output_buffers[col_idx]->addString(
2668  (*input_buffer->getStringBuffer())[row_index]);
2669  break;
2670  }
2671  case kTIME:
2672  case kTIMESTAMP:
2673  case kDATE:
2674  shard_output_buffers[col_idx]->addBigint(
2675  int_value_at(*input_buffer, row_index));
2676  break;
2677  case kARRAY:
2678  if (IS_STRING(col_ti.get_subtype())) {
2679  CHECK(input_buffer->getStringArrayBuffer());
2680  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2681  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2682  shard_output_buffers[col_idx]->addStringArray(input_arr);
2683  } else {
2684  shard_output_buffers[col_idx]->addArray(
2685  (*input_buffer->getArrayBuffer())[row_index]);
2686  }
2687  break;
2688  case kPOINT:
2689  case kLINESTRING:
2690  case kPOLYGON:
2691  case kMULTIPOLYGON: {
2692  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2693  shard_output_buffers[col_idx]->addGeoString(
2694  (*input_buffer->getGeoStringBuffer())[row_index]);
2695  break;
2696  }
2697  default:
2698  CHECK(false);
2699  }
2700  }
2701  ++all_shard_row_counts[shard];
2702  // when replicating a column, row count of a shard == replicate count of the column on
2703  // the shard
2704  if (getReplicating()) {
2705  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2706  }
2707  }
2708 }
Definition: sqltypes.h:50
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3494
const TableDescriptor * table_desc_
Definition: Importer.h:584
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
specifies the content in-memory of a row in the column metadata table
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2553
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:311
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
Definition: sqltypes.h:54
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:585
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2546
Definition: sqltypes.h:42
#define IS_STRING(T)
Definition: sqltypes.h:172
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2515
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583
import_export::TypedImportBuffer TypedImportBuffer
Definition: sqltypes.h:46
bool getReplicating() const
Definition: Importer.h:565
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::dropColumns ( const std::vector< int > &  columns)

Definition at line 2845 of file Importer.cpp.

References catalog_, Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), TableDescriptor::nShards, and table_desc_.

2845  {
2846  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2847  if (table_desc_->nShards) {
2849  }
2850  for (auto table_desc : table_descs) {
2851  table_desc->fragmenter->dropColumns(columnIds);
2852  }
2853 }
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3494
const TableDescriptor * table_desc_
Definition: Importer.h:584
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583

+ Here is the call graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Loader::get_column_descs ( ) const
inline

Definition at line 541 of file Importer.h.

References column_descs_.

Referenced by import_export::setup_column_loaders().

541  {
542  return column_descs_;
543  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:585

+ Here is the caller graph for this function:

std::vector< DataBlockPtr > import_export::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2742 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK(), CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by loadImpl(), and loadToShard().

2743  {
2744  std::vector<DataBlockPtr> result(import_buffers.size());
2745  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2746  encoded_data_block_ptrs_futures;
2747  // make all async calls to string dictionary here and then continue execution
2748  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2749  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2750  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2751  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2752  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2753 
2754  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2755  buf_idx,
2756  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2757  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2758  return import_buffers[buf_idx]->getStringDictBuffer();
2759  })));
2760  }
2761  }
2762 
2763  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2764  DataBlockPtr p;
2765  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2766  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2767  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2768  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2769  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2770  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2771  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2772  p.stringsPtr = string_payload_ptr;
2773  } else {
2774  // This condition means we have column which is ENCODED string. We already made
2775  // Async request to gain the encoded integer values above so we should skip this
2776  // iteration and continue.
2777  continue;
2778  }
2779  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2780  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2781  p.stringsPtr = geo_payload_ptr;
2782  } else {
2783  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2784  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2785  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2786  import_buffers[buf_idx]->addDictEncodedStringArray(
2787  *import_buffers[buf_idx]->getStringArrayBuffer());
2788  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2789  } else {
2790  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2791  }
2792  }
2793  result[buf_idx] = p;
2794  }
2795 
2796  // wait for the async requests we made for string dictionary
2797  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2798  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2799  }
2800  return result;
2801 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:149
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:150
CHECK(cgen_state)
#define IS_STRING(T)
Definition: sqltypes.h:172
int8_t * numbersPtr
Definition: sqltypes.h:148

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Loader::getCatalog ( )
inline

Definition at line 539 of file Importer.h.

References catalog_.

Referenced by checkpoint(), getTableEpoch(), setTableEpoch(), and foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::validate_schema().

539 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583

+ Here is the caller graph for this function:

bool import_export::Loader::getReplicating ( ) const
inline

Definition at line 565 of file Importer.h.

References replicating_.

Referenced by distributeToShards(), and loadToShard().

565 { return replicating_; }

+ Here is the caller graph for this function:

StringDictionary* import_export::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 545 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, dict_map_, SQLTypeInfo::get_compression(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), IS_STRING, SQLTypeInfo::is_string(), kARRAY, and kENCODING_DICT.

Referenced by foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::inititialize_import_buffers_vec(), and import_export::setup_column_loaders().

545  {
546  if ((cd->columnType.get_type() != kARRAY ||
547  !IS_STRING(cd->columnType.get_subtype())) &&
548  (!cd->columnType.is_string() ||
550  return nullptr;
551  }
552  return dict_map_.at(cd->columnId);
553  }
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:259
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:266
#define IS_STRING(T)
Definition: sqltypes.h:172
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:415
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:588

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const TableDescriptor* import_export::Loader::getTableDesc ( ) const
inline

Definition at line 540 of file Importer.h.

References table_desc_.

Referenced by checkpoint(), getTableEpoch(), foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::read_parquet_data_into_import_buffer(), setTableEpoch(), and foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::validate_schema().

540 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:584

+ Here is the caller graph for this function:

int32_t import_export::Loader::getTableEpoch ( )
virtual

Definition at line 4160 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::getTableEpoch().

4160  {
4161  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4162  getTableDesc()->tableId);
4163 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2688
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:539

+ Here is the call graph for this function:

void import_export::Loader::init ( const bool  use_catalog_locks)
protected

Definition at line 2855 of file Importer.cpp.

References catalog_, CHECK(), column_descs_, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, dict_map_, Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getMetadataForDict(), Catalog_Namespace::Catalog::getMetadataForDictUnlocked(), insert_data_, kENCODING_DICT, Fragmenter_Namespace::InsertData::numRows, table_desc_, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Loader().

2855  {
2858  for (auto cd : column_descs_) {
2859  insert_data_.columnIds.push_back(cd->columnId);
2860  if (cd->columnType.get_compression() == kENCODING_DICT) {
2861  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2862  const auto dd = use_catalog_locks
2863  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2865  cd->columnType.get_comp_param(), true);
2866  CHECK(dd);
2867  dict_map_[cd->columnId] = dd->stringDict.get();
2868  }
2869  }
2870  insert_data_.numRows = 0;
2871 }
const TableDescriptor * table_desc_
Definition: Importer.h:584
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:587
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449
const DictDescriptor * getMetadataForDictUnlocked(int dict_ref, bool loadDict) const
Definition: Catalog.cpp:1455
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:585
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:588

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2508 of file Importer.cpp.

References loadImpl().

2509  {
2510  return loadImpl(import_buffers, row_count, true);
2511 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2710

+ Here is the call graph for this function:

bool import_export::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2710 of file Importer.cpp.

References catalog_, distributeToShards(), get_data_block_pointers(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), load_callback_, loadToShard(), TableDescriptor::nShards, and table_desc_.

Referenced by load(), and loadNoCheckpoint().

2713  {
2714  if (load_callback_) {
2715  auto data_blocks = get_data_block_pointers(import_buffers);
2716  return load_callback_(import_buffers, data_blocks, row_count);
2717  }
2718  if (table_desc_->nShards) {
2719  std::vector<OneShardBuffers> all_shard_import_buffers;
2720  std::vector<size_t> all_shard_row_counts;
2721  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2722  distributeToShards(all_shard_import_buffers,
2723  all_shard_row_counts,
2724  import_buffers,
2725  row_count,
2726  shard_tables.size());
2727  bool success = true;
2728  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2729  if (!all_shard_row_counts[shard_idx]) {
2730  continue;
2731  }
2732  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2733  all_shard_row_counts[shard_idx],
2734  shard_tables[shard_idx],
2735  checkpoint);
2736  }
2737  return success;
2738  }
2739  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2740 }
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3494
const TableDescriptor * table_desc_
Definition: Importer.h:584
virtual void checkpoint()
Definition: Importer.cpp:4153
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2803
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:583
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2562
LoadCallbackType load_callback_
Definition: Importer.h:586
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2742

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2502 of file Importer.cpp.

References loadImpl().

2504  {
2505  return loadImpl(import_buffers, row_count, false);
2506 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2710

+ Here is the call graph for this function:

bool import_export::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2803 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, get_data_block_pointers(), getReplicating(), insert_data_, loader_mutex_, LOG, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

Referenced by loadImpl().

2807  {
2808  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2809  // patch insert_data with new column
2810  if (this->getReplicating()) {
2811  for (const auto& import_buff : import_buffers) {
2812  insert_data_.replicate_count = import_buff->get_replicate_count();
2813  }
2814  }
2815 
2817  ins_data.numRows = row_count;
2818  bool success = true;
2819 
2820  ins_data.data = get_data_block_pointers(import_buffers);
2821 
2822  for (const auto& import_buffer : import_buffers) {
2823  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2824  }
2825 
2826  // release loader_lock so that in InsertOrderFragmenter::insertDat
2827  // we can have multiple threads sort/shuffle InsertData
2828  loader_lock.unlock();
2829 
2830  {
2831  try {
2832  if (checkpoint) {
2833  shard_table->fragmenter->insertData(ins_data);
2834  } else {
2835  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2836  }
2837  } catch (std::exception& e) {
2838  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2839  success = false;
2840  }
2841  }
2842  return success;
2843 }
std::mutex loader_mutex_
Definition: Importer.h:599
#define LOG(tag)
Definition: Logger.h:188
virtual void checkpoint()
Definition: Importer.cpp:4153
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:587
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool getReplicating() const
Definition: Importer.h:565
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2742

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 564 of file Importer.h.

References replicating_.

564 { replicating_ = replicating; }
void import_export::Loader::setTableEpoch ( const int32_t  new_epoch)
virtual

Definition at line 4165 of file Importer.cpp.

References getCatalog(), getTableDesc(), and Catalog_Namespace::Catalog::setTableEpoch().

4165  {
4167  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4168 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2718
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:539

+ Here is the call graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog& import_export::Loader::catalog_
protected

Definition at line 583 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getCatalog(), init(), and loadImpl().

std::list<const ColumnDescriptor*> import_export::Loader::column_descs_
protected

Definition at line 585 of file Importer.h.

Referenced by distributeToShards(), get_column_descs(), and init().

std::map<int, StringDictionary*> import_export::Loader::dict_map_
protected

Definition at line 588 of file Importer.h.

Referenced by getStringDict(), and init().

Fragmenter_Namespace::InsertData import_export::Loader::insert_data_
protected

Definition at line 587 of file Importer.h.

Referenced by init(), and loadToShard().

LoadCallbackType import_export::Loader::load_callback_
protected

Definition at line 586 of file Importer.h.

Referenced by loadImpl().

std::mutex import_export::Loader::loader_mutex_
private

Definition at line 599 of file Importer.h.

Referenced by loadToShard().

bool import_export::Loader::replicating_ = false
private

Definition at line 598 of file Importer.h.

Referenced by getReplicating(), and setReplicating().

const TableDescriptor* import_export::Loader::table_desc_
protected

Definition at line 584 of file Importer.h.

Referenced by distributeToShards(), dropColumns(), getTableDesc(), init(), and loadImpl().


The documentation for this class was generated from the following files: