OmniSciDB  04ee39c94c
Importer_NS::Loader Class Reference

#include <Importer.h>

+ Inheritance diagram for Importer_NS::Loader:
+ Collaboration diagram for Importer_NS::Loader:

Public Member Functions

 Loader (Catalog_Namespace::Catalog &c, const TableDescriptor *t)
 
virtual ~Loader ()
 
Catalog_Namespace::CataloggetCatalog ()
 
const TableDescriptorgetTableDesc () const
 
const std::list< const ColumnDescriptor * > & get_column_descs () const
 
StringDictionarygetStringDict (const ColumnDescriptor *cd) const
 
virtual bool load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual bool loadNoCheckpoint (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
 
virtual void checkpoint ()
 
virtual int32_t getTableEpoch ()
 
virtual void setTableEpoch (const int32_t new_epoch)
 
void setReplicating (const bool replicating)
 
bool getReplicating () const
 

Protected Types

using OneShardBuffers = std::vector< std::unique_ptr< TypedImportBuffer > >
 

Protected Member Functions

void init ()
 
virtual bool loadImpl (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
 
void distributeToShards (std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
 

Protected Attributes

Catalog_Namespace::Catalogcatalog_
 
const TableDescriptortable_desc_
 
std::list< const ColumnDescriptor * > column_descs_
 
Fragmenter_Namespace::InsertData insert_data_
 
std::map< int, StringDictionary * > dict_map_
 

Private Member Functions

std::vector< DataBlockPtrget_data_block_pointers (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
 
bool loadToShard (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
 

Private Attributes

bool replicating_ = false
 
std::mutex loader_mutex_
 

Detailed Description

Definition at line 636 of file Importer.h.

Member Typedef Documentation

◆ OneShardBuffers

using Importer_NS::Loader::OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer> >
protected

Definition at line 683 of file Importer.h.

Constructor & Destructor Documentation

◆ Loader()

Importer_NS::Loader::Loader ( Catalog_Namespace::Catalog c,
const TableDescriptor t 
)
inline

Definition at line 638 of file Importer.h.

References logger::init().

639  : catalog_(c)
640  , table_desc_(t)
641  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true)) {
642  init();
643  }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:690
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:692
const TableDescriptor * table_desc_
Definition: Importer.h:691
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1579
+ Here is the call graph for this function:

◆ ~Loader()

virtual Importer_NS::Loader::~Loader ( )
inlinevirtual

Definition at line 645 of file Importer.h.

645 {}

Member Function Documentation

◆ checkpoint()

void Importer_NS::Loader::checkpoint ( )
virtual

Definition at line 3931 of file Importer.cpp.

References Catalog_Namespace::Catalog::checkpoint(), Data_Namespace::DISK_LEVEL, and Importer_NS::Importer::getCatalog().

3931  {
3932  if (getTableDesc()->persistenceLevel ==
3933  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3934  getCatalog().checkpoint(getTableDesc()->tableId);
3935  }
3936 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:648
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:647
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:2927
+ Here is the call graph for this function:

◆ distributeToShards()

void Importer_NS::Loader::distributeToShards ( std::vector< OneShardBuffers > &  all_shard_import_buffers,
std::vector< size_t > &  all_shard_row_counts,
const OneShardBuffers import_buffers,
const size_t  row_count,
const size_t  shard_count 
)
protected

Definition at line 2346 of file Importer.cpp.

References CHECK, CHECK_GT, CHECK_LE, CHECK_LT, decimal_to_int_type(), Importer_NS::anonymous_namespace{Importer.cpp}::double_value_at(), Importer_NS::anonymous_namespace{Importer.cpp}::float_value_at(), Importer_NS::anonymous_namespace{Importer.cpp}::int_value_at(), IS_STRING, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, SHARD_FOR_KEY, and run-benchmark-import::type.

2350  {
2351  all_shard_row_counts.resize(shard_count);
2352  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2353  all_shard_import_buffers.emplace_back();
2354  for (const auto& typed_import_buffer : import_buffers) {
2355  all_shard_import_buffers.back().emplace_back(
2356  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2357  typed_import_buffer->getStringDictionary()));
2358  }
2359  }
2361  int col_idx{0};
2362  const ColumnDescriptor* shard_col_desc{nullptr};
2363  for (const auto col_desc : column_descs_) {
2364  ++col_idx;
2365  if (col_idx == table_desc_->shardedColumnId) {
2366  shard_col_desc = col_desc;
2367  break;
2368  }
2369  }
2370  CHECK(shard_col_desc);
2371  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2372  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2373  const auto& shard_col_ti = shard_col_desc->columnType;
2374  CHECK(shard_col_ti.is_integer() ||
2375  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2376  shard_col_ti.is_time());
2377  if (shard_col_ti.is_string()) {
2378  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2379  CHECK(payloads_ptr);
2380  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2381  }
2382 
2383  // for each replicated (alter added) columns, number of rows in a shard is
2384  // inferred from that of the sharding column, not simply evenly distributed.
2385  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2386  // Here the loop count is overloaded. For normal imports, we loop thru all
2387  // input values (rows), so the loop count is the number of input rows.
2388  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2389  // all shards, so the loop count is the number of shards.
2390  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2391  for (size_t i = 0; i < loop_count; ++i) {
2392  const size_t shard =
2393  getReplicating()
2394  ? i
2395  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2396  auto& shard_output_buffers = all_shard_import_buffers[shard];
2397 
2398  // when replicate a column, populate 'rows' to all shards only once
2399  // and its value is fetch from the first and the single row
2400  const auto row_index = getReplicating() ? 0 : i;
2401 
2402  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2403  const auto& input_buffer = import_buffers[col_idx];
2404  const auto& col_ti = input_buffer->getTypeInfo();
2405  const auto type =
2406  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2407 
2408  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2409  // count. and, bypass non-replicated column.
2410  if (getReplicating()) {
2411  if (input_buffer->get_replicate_count() > 0) {
2412  shard_output_buffers[col_idx]->set_replicate_count(
2413  shard_tds[shard]->fragmenter->getNumRows());
2414  } else {
2415  continue;
2416  }
2417  }
2418 
2419  switch (type) {
2420  case kBOOLEAN:
2421  shard_output_buffers[col_idx]->addBoolean(
2422  int_value_at(*input_buffer, row_index));
2423  break;
2424  case kTINYINT:
2425  shard_output_buffers[col_idx]->addTinyint(
2426  int_value_at(*input_buffer, row_index));
2427  break;
2428  case kSMALLINT:
2429  shard_output_buffers[col_idx]->addSmallint(
2430  int_value_at(*input_buffer, row_index));
2431  break;
2432  case kINT:
2433  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2434  break;
2435  case kBIGINT:
2436  shard_output_buffers[col_idx]->addBigint(
2437  int_value_at(*input_buffer, row_index));
2438  break;
2439  case kFLOAT:
2440  shard_output_buffers[col_idx]->addFloat(
2441  float_value_at(*input_buffer, row_index));
2442  break;
2443  case kDOUBLE:
2444  shard_output_buffers[col_idx]->addDouble(
2445  double_value_at(*input_buffer, row_index));
2446  break;
2447  case kTEXT:
2448  case kVARCHAR:
2449  case kCHAR: {
2450  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2451  shard_output_buffers[col_idx]->addString(
2452  (*input_buffer->getStringBuffer())[row_index]);
2453  break;
2454  }
2455  case kTIME:
2456  case kTIMESTAMP:
2457  case kDATE:
2458  shard_output_buffers[col_idx]->addBigint(
2459  int_value_at(*input_buffer, row_index));
2460  break;
2461  case kARRAY:
2462  if (IS_STRING(col_ti.get_subtype())) {
2463  CHECK(input_buffer->getStringArrayBuffer());
2464  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2465  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2466  shard_output_buffers[col_idx]->addStringArray(input_arr);
2467  } else {
2468  shard_output_buffers[col_idx]->addArray(
2469  (*input_buffer->getArrayBuffer())[row_index]);
2470  }
2471  break;
2472  case kPOINT:
2473  case kLINESTRING:
2474  case kPOLYGON:
2475  case kMULTIPOLYGON: {
2476  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2477  shard_output_buffers[col_idx]->addGeoString(
2478  (*input_buffer->getGeoStringBuffer())[row_index]);
2479  break;
2480  }
2481  default:
2482  CHECK(false);
2483  }
2484  }
2485  ++all_shard_row_counts[shard];
2486  // when replicating a column, row count of a shard == replicate count of the column on
2487  // the shard
2488  if (getReplicating()) {
2489  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2490  }
2491  }
2492 }
Definition: sqltypes.h:51
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2299
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:690
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2895
#define CHECK_GT(x, y)
Definition: Logger.h:199
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:692
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:268
#define CHECK_LT(x, y)
Definition: Logger.h:197
Definition: sqltypes.h:54
Definition: sqltypes.h:55
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2330
#define CHECK_LE(x, y)
Definition: Logger.h:198
const TableDescriptor * table_desc_
Definition: Importer.h:691
Definition: sqltypes.h:43
#define IS_STRING(T)
Definition: sqltypes.h:163
#define CHECK(condition)
Definition: Logger.h:187
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2337
Definition: sqltypes.h:47
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
bool getReplicating() const
Definition: Importer.h:673
+ Here is the call graph for this function:

◆ get_column_descs()

const std::list<const ColumnDescriptor*>& Importer_NS::Loader::get_column_descs ( ) const
inline

Definition at line 649 of file Importer.h.

649  {
650  return column_descs_;
651  }
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:692

◆ get_data_block_pointers()

std::vector< DataBlockPtr > Importer_NS::Loader::get_data_block_pointers ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers)
private

Definition at line 2522 of file Importer.cpp.

References DataBlockPtr::arraysPtr, CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run-benchmark-import::result, and DataBlockPtr::stringsPtr.

2523  {
2524  std::vector<DataBlockPtr> result(import_buffers.size());
2525  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2526  encoded_data_block_ptrs_futures;
2527  // make all async calls to string dictionary here and then continue execution
2528  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2529  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2530  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2531  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2532  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2533 
2534  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2535  buf_idx,
2536  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2537  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2538  return import_buffers[buf_idx]->getStringDictBuffer();
2539  })));
2540  }
2541  }
2542 
2543  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2544  DataBlockPtr p;
2545  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2546  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2547  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2548  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2549  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2550  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2551  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2552  p.stringsPtr = string_payload_ptr;
2553  } else {
2554  // This condition means we have column which is ENCODED string. We already made
2555  // Async request to gain the encoded integer values above so we should skip this
2556  // iteration and continue.
2557  continue;
2558  }
2559  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2560  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2561  p.stringsPtr = geo_payload_ptr;
2562  } else {
2563  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2564  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2565  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2566  import_buffers[buf_idx]->addDictEncodedStringArray(
2567  *import_buffers[buf_idx]->getStringArrayBuffer());
2568  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2569  } else {
2570  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2571  }
2572  }
2573  result[buf_idx] = p;
2574  }
2575 
2576  // wait for the async requests we made for string dictionary
2577  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2578  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2579  }
2580  return result;
2581 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:138
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:139
#define IS_STRING(T)
Definition: sqltypes.h:163
#define CHECK(condition)
Definition: Logger.h:187
int8_t * numbersPtr
Definition: sqltypes.h:137

◆ getCatalog()

Catalog_Namespace::Catalog& Importer_NS::Loader::getCatalog ( )
inline

Definition at line 647 of file Importer.h.

647 { return catalog_; }
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:690

◆ getReplicating()

bool Importer_NS::Loader::getReplicating ( ) const
inline

Definition at line 673 of file Importer.h.

References logger::init().

673 { return replicating_; }
+ Here is the call graph for this function:

◆ getStringDict()

StringDictionary* Importer_NS::Loader::getStringDict ( const ColumnDescriptor cd) const
inline

Definition at line 653 of file Importer.h.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_subtype(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), IS_STRING, SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), kARRAY, and kENCODING_DICT.

653  {
654  if ((cd->columnType.get_type() != kARRAY ||
655  !IS_STRING(cd->columnType.get_subtype())) &&
656  (!cd->columnType.is_string() ||
658  return nullptr;
659  }
660  return dict_map_.at(cd->columnId);
661  }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:694
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:324
#define IS_STRING(T)
Definition: sqltypes.h:163
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:450
+ Here is the call graph for this function:

◆ getTableDesc()

const TableDescriptor* Importer_NS::Loader::getTableDesc ( ) const
inline

Definition at line 648 of file Importer.h.

648 { return table_desc_; }
const TableDescriptor * table_desc_
Definition: Importer.h:691

◆ getTableEpoch()

int32_t Importer_NS::Loader::getTableEpoch ( )
virtual

Definition at line 3938 of file Importer.cpp.

References Importer_NS::Importer::getCatalog(), and Catalog_Namespace::Catalog::getTableEpoch().

3938  {
3939  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
3940  getTableDesc()->tableId);
3941 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:648
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:647
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2123
+ Here is the call graph for this function:

◆ init()

void Importer_NS::Loader::init ( )
protected

Definition at line 2627 of file Importer.cpp.

References CHECK, and kENCODING_DICT.

2627  {
2630  for (auto cd : column_descs_) {
2631  insert_data_.columnIds.push_back(cd->columnId);
2632  if (cd->columnType.get_compression() == kENCODING_DICT) {
2633  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2634  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2635  CHECK(dd);
2636  dict_map_[cd->columnId] = dd->stringDict.get();
2637  }
2638  }
2639  insert_data_.numRows = 0;
2640 }
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:693
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:694
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:690
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1348
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:692
const TableDescriptor * table_desc_
Definition: Importer.h:691
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62

◆ load()

bool Importer_NS::Loader::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Reimplemented in DistributedLoader.

Definition at line 2292 of file Importer.cpp.

2293  {
2294  return loadImpl(import_buffers, row_count, true);
2295 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2494

◆ loadImpl()

bool Importer_NS::Loader::loadImpl ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
bool  checkpoint 
)
protectedvirtual

Definition at line 2494 of file Importer.cpp.

2497  {
2498  if (table_desc_->nShards) {
2499  std::vector<OneShardBuffers> all_shard_import_buffers;
2500  std::vector<size_t> all_shard_row_counts;
2501  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2502  distributeToShards(all_shard_import_buffers,
2503  all_shard_row_counts,
2504  import_buffers,
2505  row_count,
2506  shard_tables.size());
2507  bool success = true;
2508  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2509  if (!all_shard_row_counts[shard_idx]) {
2510  continue;
2511  }
2512  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2513  all_shard_row_counts[shard_idx],
2514  shard_tables[shard_idx],
2515  checkpoint);
2516  }
2517  return success;
2518  }
2519  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2520 }
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2346
virtual void checkpoint()
Definition: Importer.cpp:3931
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:690
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2895
const TableDescriptor * table_desc_
Definition: Importer.h:691
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2583

◆ loadNoCheckpoint()

bool Importer_NS::Loader::loadNoCheckpoint ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
const size_t  row_count 
)
virtual

Definition at line 2286 of file Importer.cpp.

2288  {
2289  return loadImpl(import_buffers, row_count, false);
2290 }
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2494

◆ loadToShard()

bool Importer_NS::Loader::loadToShard ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const TableDescriptor shard_table,
bool  checkpoint 
)
private

Definition at line 2583 of file Importer.cpp.

References Fragmenter_Namespace::InsertData::bypass, Fragmenter_Namespace::InsertData::data, logger::ERROR, TableDescriptor::fragmenter, Fragmenter_Namespace::AbstractFragmenter::insertData(), Fragmenter_Namespace::AbstractFragmenter::insertDataNoCheckpoint(), LOG, and Fragmenter_Namespace::InsertData::numRows.

2587  {
2588  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2589  // patch insert_data with new column
2590  if (this->getReplicating()) {
2591  for (const auto& import_buff : import_buffers) {
2592  insert_data_.replicate_count = import_buff->get_replicate_count();
2593  insert_data_.columnDescriptors[import_buff->getColumnDesc()->columnId] =
2594  import_buff->getColumnDesc();
2595  }
2596  }
2597 
2599  ins_data.numRows = row_count;
2600  bool success = true;
2601 
2602  ins_data.data = get_data_block_pointers(import_buffers);
2603 
2604  for (const auto& import_buffer : import_buffers) {
2605  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2606  }
2607 
2608  // release loader_lock so that in InsertOrderFragmenter::insertDat
2609  // we can have multiple threads sort/shuffle InsertData
2610  loader_lock.unlock();
2611 
2612  {
2613  try {
2614  if (checkpoint) {
2615  shard_table->fragmenter->insertData(ins_data);
2616  } else {
2617  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2618  }
2619  } catch (std::exception& e) {
2620  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2621  success = false;
2622  }
2623  }
2624  return success;
2625 }
virtual void checkpoint()
Definition: Importer.cpp:3931
std::mutex loader_mutex_
Definition: Importer.h:705
#define LOG(tag)
Definition: Logger.h:182
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:693
std::map< int, const ColumnDescriptor * > columnDescriptors
Definition: Fragmenter.h:69
virtual void insertDataNoCheckpoint(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2522
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
Fragmenter_Namespace::AbstractFragmenter * fragmenter
virtual void insertData(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions with locks and che...
bool getReplicating() const
Definition: Importer.h:673
+ Here is the call graph for this function:

◆ setReplicating()

void Importer_NS::Loader::setReplicating ( const bool  replicating)
inline

Definition at line 672 of file Importer.h.

672 { replicating_ = replicating; }

◆ setTableEpoch()

void Importer_NS::Loader::setTableEpoch ( const int32_t  new_epoch)
virtual

Definition at line 3943 of file Importer.cpp.

References Importer_NS::Importer::getCatalog(), and Catalog_Namespace::Catalog::setTableEpoch().

3943  {
3945  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
3946 }
const TableDescriptor * getTableDesc() const
Definition: Importer.h:648
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:647
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2153
+ Here is the call graph for this function:

Member Data Documentation

◆ catalog_

Catalog_Namespace::Catalog& Importer_NS::Loader::catalog_
protected

Definition at line 690 of file Importer.h.

◆ column_descs_

std::list<const ColumnDescriptor*> Importer_NS::Loader::column_descs_
protected

Definition at line 692 of file Importer.h.

◆ dict_map_

std::map<int, StringDictionary*> Importer_NS::Loader::dict_map_
protected

Definition at line 694 of file Importer.h.

◆ insert_data_

Fragmenter_Namespace::InsertData Importer_NS::Loader::insert_data_
protected

Definition at line 693 of file Importer.h.

◆ loader_mutex_

std::mutex Importer_NS::Loader::loader_mutex_
private

Definition at line 705 of file Importer.h.

◆ replicating_

bool Importer_NS::Loader::replicating_ = false
private

Definition at line 704 of file Importer.h.

◆ table_desc_

const TableDescriptor* Importer_NS::Loader::table_desc_
protected

Definition at line 691 of file Importer.h.


The documentation for this class was generated from the following files: