OmniSciDB  dfae7c3b14
JoinHashTable Class Reference

#include <JoinHashTable.h>

+ Inheritance diagram for JoinHashTable:
+ Collaboration diagram for JoinHashTable:

Classes

struct  JoinHashTableCacheKey
 

Public Member Functions

int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexcept override
 
size_t getJoinHashBufferSize (const ExecutorDeviceType device_type, const int device_id) const noexcept override
 
std::string toString (const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
 
std::set< DecodedJoinHashBufferEntrytoSet (const ExecutorDeviceType device_type, const int device_id) const override
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexcept override
 
int getInnerTableRteIdx () const noexcept override
 
HashType getHashType () const noexcept override
 
Data_Namespace::MemoryLevel getMemoryLevel () const noexcept override
 
int getDeviceCount () const noexcept override
 
size_t offsetBufferOff () const noexcept override
 
size_t countBufferOff () const noexcept override
 
size_t payloadBufferOff () const noexcept override
 
virtual ~JoinHashTable ()
 
- Public Member Functions inherited from JoinHashTableInterface
virtual std::string toStringFlat64 (const ExecutorDeviceType device_type, const int device_id) const
 
virtual std::string toStringFlat32 (const ExecutorDeviceType device_type, const int device_id) const
 
JoinColumn fetchJoinColumn (const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
 

Static Public Member Functions

static std::shared_ptr< JoinHashTablegetInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static HashJoinMatchingSet codegenMatchingSet (const std::vector< llvm::Value *> &hash_join_idx_args_in, const bool is_sharded, const bool col_is_nullable, const bool is_bw_eq, const int64_t sub_buff_size, Executor *executor, const bool is_bucketized=false)
 
static llvm::Value * codegenHashTableLoad (const size_t table_idx, Executor *executor)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
static const std::shared_ptr< std::vector< int32_t > > & getCachedHashTable (size_t idx)
 
static uint64_t getNumberOfCachedHashTables ()
 
- Static Public Member Functions inherited from JoinHashTableInterface
static std::string getHashTypeString (HashType ht) noexcept
 
static DecodedJoinHashBufferSet toSet (size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
 Decode hash table into a std::set for easy inspection and validation. More...
 
static std::string toString (const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
 Decode hash table into a human-readable string. More...
 
static std::shared_ptr< JoinHashTableInterfacegetInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr< JoinHashTableInterfacegetSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static std::shared_ptr< JoinHashTableInterfacegetSyntheticInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 

Private Member Functions

 JoinHashTable (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
 
ChunkKey genHashTableKey (const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
 
void reify ()
 
void reifyOneToOneForDevice (const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
 
void reifyOneToManyForDevice (const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
void initOneToOneHashTable (const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initOneToManyHashTable (const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initHashTableOnCpuFromCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
 
void putHashTableOnCpuToCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
 
void initOneToOneHashTableOnCpu (const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
void initOneToManyHashTableOnCpu (const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
const InputTableInfogetInnerQueryInfo (const Analyzer::ColumnVar *inner_col) const
 
size_t shardCount () const
 
llvm::Value * codegenHashTableLoad (const size_t table_idx)
 
std::vector< llvm::Value * > getHashJoinArgs (llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
size_t getComponentBufferSize () const noexcept
 
bool layoutRequiresAdditionalBuffers (JoinHashTableInterface::HashType layout) const noexcept override
 

Private Attributes

std::shared_ptr< Analyzer::BinOperqual_bin_oper_
 
std::shared_ptr< Analyzer::ColumnVarcol_var_
 
const std::vector< InputTableInfo > & query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
HashType hash_type_
 
size_t hash_entry_count_
 
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
ExpressionRange col_range_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
const int device_count_
 

Static Private Attributes

static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
 
static std::mutex join_hash_table_cache_mutex_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType : int { HashType::OneToOne, HashType::OneToMany, HashType::ManyToMany }
 

Detailed Description

Definition at line 51 of file JoinHashTable.h.

Constructor & Destructor Documentation

◆ ~JoinHashTable()

JoinHashTable::~JoinHashTable ( )
virtual

Definition at line 564 of file JoinHashTable.cpp.

References CHECK.

Referenced by getNumberOfCachedHashTables().

564  {
565 #ifdef HAVE_CUDA
566  CHECK(executor_);
567  CHECK(executor_->catalog_);
568  auto& data_mgr = executor_->catalog_->getDataMgr();
569  for (auto& gpu_buffer : gpu_hash_table_buff_) {
570  if (gpu_buffer) {
571  data_mgr.free(gpu_buffer);
572  }
573  }
574  for (auto& gpu_buffer : gpu_hash_table_err_buff_) {
575  if (gpu_buffer) {
576  data_mgr.free(gpu_buffer);
577  }
578  }
579 #endif
580 }
Executor * executor_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ JoinHashTable()

JoinHashTable::JoinHashTable ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const Analyzer::ColumnVar col_var,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const ExpressionRange col_range,
ColumnCacheMap column_cache,
Executor executor,
const int  device_count 
)
inlineprivate

Definition at line 138 of file JoinHashTable.h.

References CHECK, CHECK_GT, checkHashJoinReplicationConstraint(), codegenHashTableLoad(), device_count_, freeHashBufferCpuMemory(), freeHashBufferGpuMemory(), freeHashBufferMemory(), genHashTableKey(), getComponentBufferSize(), getHashJoinArgs(), getInnerQueryInfo(), ExpressionRange::getType(), initHashTableOnCpuFromCache(), initOneToManyHashTable(), initOneToManyHashTableOnCpu(), initOneToOneHashTable(), initOneToOneHashTableOnCpu(), Integer, isBitwiseEq(), putHashTableOnCpuToCache(), reify(), reifyOneToManyForDevice(), reifyOneToOneForDevice(), and shardCount().

147  : qual_bin_oper_(qual_bin_oper)
148  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
149  , query_infos_(query_infos)
150  , memory_level_(memory_level)
151  , hash_type_(preferred_hash_type)
152  , hash_entry_count_(0)
153  , col_range_(col_range)
154  , executor_(executor)
155  , column_cache_(column_cache)
156  , device_count_(device_count) {
159  }
const std::vector< InputTableInfo > & query_infos_
const int device_count_
ColumnCacheMap & column_cache_
std::shared_ptr< Analyzer::Expr > deep_copy() const override
Definition: Analyzer.cpp:59
HashType hash_type_
#define CHECK_GT(x, y)
Definition: Logger.h:209
ExpressionRangeType getType() const
Executor * executor_
ExpressionRange col_range_
#define CHECK(condition)
Definition: Logger.h:197
std::shared_ptr< Analyzer::ColumnVar > col_var_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_
+ Here is the call graph for this function:

Member Function Documentation

◆ checkHashJoinReplicationConstraint()

void JoinHashTable::checkHashJoinReplicationConstraint ( const int  table_id) const
private

Definition at line 721 of file JoinHashTable.cpp.

References CHECK, g_cluster, get_shard_count(), and table_is_replicated().

Referenced by JoinHashTable().

721  {
722  if (!g_cluster) {
723  return;
724  }
725  if (table_id >= 0) {
726  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
727  CHECK(inner_td);
728  size_t shard_count{0};
729  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
730  if (!shard_count && !table_is_replicated(inner_td)) {
731  throw TableMustBeReplicated(inner_td->tableName);
732  }
733  }
734 }
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Executor * executor_
bool table_is_replicated(const TableDescriptor *td)
#define CHECK(condition)
Definition: Logger.h:197
bool g_cluster
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenHashTableLoad() [1/2]

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx,
Executor executor 
)
static

Definition at line 1248 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, CHECK, CHECK_LT, and get_arg_by_name().

Referenced by BaselineJoinHashTable::codegenMatchingSet(), OverlapsJoinHashTable::codegenMatchingSet(), getDeviceCount(), BaselineJoinHashTable::hashPtr(), and JoinHashTable().

1249  {
1250  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1251  llvm::Value* hash_ptr = nullptr;
1252  const auto total_table_count =
1253  executor->plan_state_->join_info_.join_hash_tables_.size();
1254  CHECK_LT(table_idx, total_table_count);
1255  if (total_table_count > 1) {
1256  auto hash_tables_ptr =
1257  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1258  auto hash_pptr =
1259  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1260  hash_tables_ptr,
1261  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1262  : hash_tables_ptr;
1263  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1264  } else {
1265  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1266  }
1267  CHECK(hash_ptr);
1268  return hash_ptr;
1269 }
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:129
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenHashTableLoad() [2/2]

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx)
private

Definition at line 1236 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, CHECK, and get_arg_by_name().

1236  {
1237  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1238  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1239  if (hash_ptr->getType()->isIntegerTy(64)) {
1240  return hash_ptr;
1241  }
1242  CHECK(hash_ptr->getType()->isPointerTy());
1243  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1244  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1245  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1246 }
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:129
#define AUTOMATIC_IR_METADATA(CGENSTATE)
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ codegenMatchingSet() [1/2]

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1322 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, CHECK, anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_max_rte_scan_table(), kDATE, and self_join_not_covered_by_left_deep_tree().

Referenced by BaselineJoinHashTable::codegenMatchingSet(), and getDeviceCount().

1323  {
1324  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1325  const auto cols = get_cols(
1326  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1327  auto key_col = cols.second;
1328  CHECK(key_col);
1329  auto val_col = cols.first;
1330  CHECK(val_col);
1331  auto pos_ptr = codegenHashTableLoad(index);
1332  CHECK(pos_ptr);
1333  const int shard_count = shardCount();
1334  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1335  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1336  if (key_col_var && val_col_var &&
1338  key_col_var,
1339  val_col_var,
1340  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1341  throw std::runtime_error(
1342  "Query execution fails because the query contains not supported self-join "
1343  "pattern. We suspect the query requires multiple left-deep join tree due to the "
1344  "join condition of the self-join and is not supported for now. Please consider "
1345  "rewriting table order in "
1346  "FROM clause.");
1347  }
1348  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1349  const int64_t sub_buff_size = getComponentBufferSize();
1350  const auto& key_col_ti = key_col->get_type_info();
1351 
1352  auto bucketize = (key_col_ti.get_type() == kDATE);
1353  return codegenMatchingSet(hash_join_idx_args,
1354  shard_count,
1355  !key_col_ti.get_notnull(),
1356  isBitwiseEq(),
1357  sub_buff_size,
1358  executor_,
1359  bucketize);
1360 }
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
bool isBitwiseEq() const
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value *> &scan_idx_to_hash_pos)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t shardCount() const
Definition: sqltypes.h:55
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
size_t getComponentBufferSize() const noexcept
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
#define CHECK(condition)
Definition: Logger.h:197
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenMatchingSet() [2/2]

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const std::vector< llvm::Value *> &  hash_join_idx_args_in,
const bool  is_sharded,
const bool  col_is_nullable,
const bool  is_bw_eq,
const int64_t  sub_buff_size,
Executor executor,
const bool  is_bucketized = false 
)
static

Definition at line 1362 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, and CHECK.

1369  {
1370  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1371  using namespace std::string_literals;
1372 
1373  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1374 
1375  if (is_bw_eq) {
1376  fname += "_bitwise";
1377  }
1378  if (is_sharded) {
1379  fname += "_sharded";
1380  }
1381  if (!is_bw_eq && col_is_nullable) {
1382  fname += "_nullable";
1383  }
1384 
1385  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1386  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1387  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1388 
1389  auto pos_ptr = hash_join_idx_args_in[0];
1390  CHECK(pos_ptr);
1391 
1392  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1393  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1394  auto hash_join_idx_args = hash_join_idx_args_in;
1395  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1396  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1397 
1398  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1399  slot_valid_lv,
1400  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1401  executor->cgen_state_->llInt(int64_t(0)));
1402  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1403  executor->cgen_state_->ir_builder_.CreateAdd(
1404  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1405  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1406  auto rowid_ptr_i32 =
1407  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1408  return {rowid_ptr_i32, row_count_lv, slot_lv};
1409 }
#define AUTOMATIC_IR_METADATA(CGENSTATE)
#define CHECK(condition)
Definition: Logger.h:197

◆ codegenSlot()

llvm::Value * JoinHashTable::codegenSlot ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1537 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, CHECK, CHECK_EQ, CodeGenerator::codegen(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_max_rte_scan_table(), kDATE, JoinHashTableInterface::OneToOne, and self_join_not_covered_by_left_deep_tree().

1538  {
1539  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1540  using namespace std::string_literals;
1541 
1543  const auto cols = get_cols(
1544  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1545  auto key_col = cols.second;
1546  CHECK(key_col);
1547  auto val_col = cols.first;
1548  CHECK(val_col);
1549  CodeGenerator code_generator(executor_);
1550  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1551  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1552  if (key_col_var && val_col_var &&
1554  key_col_var,
1555  val_col_var,
1556  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1557  throw std::runtime_error(
1558  "Query execution fails because the query contains not supported self-join "
1559  "pattern. We suspect the query requires multiple left-deep join tree due to the "
1560  "join condition of the self-join and is not supported for now. Please consider "
1561  "rewriting table order in "
1562  "FROM clause.");
1563  }
1564  const auto key_lvs = code_generator.codegen(key_col, true, co);
1565  CHECK_EQ(size_t(1), key_lvs.size());
1566  auto hash_ptr = codegenHashTableLoad(index);
1567  CHECK(hash_ptr);
1568  const int shard_count = shardCount();
1569  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1570 
1571  const auto& key_col_ti = key_col->get_type_info();
1572  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1573  : "hash_join_idx"s);
1574 
1575  if (isBitwiseEq()) {
1576  fname += "_bitwise";
1577  }
1578  if (shard_count) {
1579  fname += "_sharded";
1580  }
1581 
1582  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1583  fname += "_nullable";
1584  }
1585  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1586 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
bool isBitwiseEq() const
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value *> &scan_idx_to_hash_pos)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t shardCount() const
Definition: sqltypes.h:55
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:197
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
HashType getHashType() const noexcept override
Definition: JoinHashTable.h:89
+ Here is the call graph for this function:

◆ countBufferOff()

size_t JoinHashTable::countBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1415 of file JoinHashTable.cpp.

Referenced by getDeviceCount().

1415  {
1416  return getComponentBufferSize();
1417 }
size_t getComponentBufferSize() const noexcept
+ Here is the caller graph for this function:

◆ freeHashBufferCpuMemory()

void JoinHashTable::freeHashBufferCpuMemory ( )
private

Definition at line 1661 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1661  {
1662  cpu_hash_table_buff_.reset();
1663 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
+ Here is the caller graph for this function:

◆ freeHashBufferGpuMemory()

void JoinHashTable::freeHashBufferGpuMemory ( )
private

Definition at line 1640 of file JoinHashTable.cpp.

References CHECK, and CudaAllocator::freeGpuAbstractBuffer().

Referenced by JoinHashTable().

1640  {
1641 #ifdef HAVE_CUDA
1642  const auto& catalog = *executor_->getCatalog();
1643  auto& data_mgr = catalog.getDataMgr();
1644  for (auto& buf : gpu_hash_table_buff_) {
1645  if (buf) {
1646  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1647  buf = nullptr;
1648  }
1649  }
1650  for (auto& buf : gpu_hash_table_err_buff_) {
1651  if (buf) {
1652  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1653  buf = nullptr;
1654  }
1655  }
1656 #else
1657  CHECK(false);
1658 #endif // HAVE_CUDA
1659 }
Executor * executor_
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freeHashBufferMemory()

void JoinHashTable::freeHashBufferMemory ( )
private

Definition at line 1633 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1633  {
1634 #ifdef HAVE_CUDA
1636 #endif
1638 }
void freeHashBufferGpuMemory()
void freeHashBufferCpuMemory()
+ Here is the caller graph for this function:

◆ genHashTableKey()

ChunkKey JoinHashTable::genHashTableKey ( const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Analyzer::Expr outer_col,
const Analyzer::ColumnVar inner_col 
) const
private

Definition at line 582 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), Analyzer::Expr::get_type_info(), and kENCODING_DICT.

Referenced by JoinHashTable().

585  {
586  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
587  inner_col->get_table_id(),
588  inner_col->get_column_id()};
589  const auto& ti = inner_col->get_type_info();
590  if (ti.is_string()) {
591  CHECK_EQ(kENCODING_DICT, ti.get_compression());
592  size_t outer_elem_count = 0;
593  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
594  CHECK(outer_col);
595  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
596  for (auto& frag : outer_query_info.fragments) {
597  outer_elem_count = frag.getNumTuples();
598  }
599  hash_table_key.push_back(outer_elem_count);
600  }
601  if (fragments.size() < 2) {
602  hash_table_key.push_back(fragments.front().fragmentId);
603  }
604  return hash_table_key;
605 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int get_column_id() const
Definition: Analyzer.h:195
Executor * executor_
int get_table_id() const
Definition: Analyzer.h:194
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getCachedHashTable()

static const std::shared_ptr<std::vector<int32_t> >& JoinHashTable::getCachedHashTable ( size_t  idx)
inlinestatic

Definition at line 123 of file JoinHashTable.h.

References CHECK, CHECK_LT, join_hash_table_cache_, and join_hash_table_cache_mutex_.

Referenced by QueryRunner::QueryRunner::getCachedJoinHashTable().

123  {
124  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
125  CHECK(!join_hash_table_cache_.empty());
126  CHECK_LT(idx, join_hash_table_cache_.size());
127  return join_hash_table_cache_.at(idx).second;
128  }
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ getComponentBufferSize()

size_t JoinHashTable::getComponentBufferSize ( ) const
privatenoexcept

Definition at line 1423 of file JoinHashTable.cpp.

References JoinHashTableInterface::OneToMany.

Referenced by JoinHashTable().

1423  {
1425  return hash_entry_count_ * sizeof(int32_t);
1426  } else {
1427  return 0;
1428  }
1429 }
HashType hash_type_
size_t hash_entry_count_
+ Here is the caller graph for this function:

◆ getDeviceCount()

int JoinHashTable::getDeviceCount ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 95 of file JoinHashTable.h.

References codegenHashTableLoad(), codegenMatchingSet(), countBufferOff(), device_count_, offsetBufferOff(), and payloadBufferOff().

95 { return device_count_; };
const int device_count_
+ Here is the call graph for this function:

◆ getHashJoinArgs()

std::vector< llvm::Value * > JoinHashTable::getHashJoinArgs ( llvm::Value *  hash_ptr,
const Analyzer::Expr key_col,
const int  shard_count,
const CompilationOptions co 
)
private

Definition at line 1271 of file JoinHashTable.cpp.

References AUTOMATIC_IR_METADATA, CHECK_EQ, anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_hash_entry_count(), get_logical_type_info(), SQLTypeInfo::get_type(), Analyzer::Expr::get_type_info(), inline_fixed_encoding_null_val(), and kDATE.

Referenced by JoinHashTable().

1274  {
1275  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1276  CodeGenerator code_generator(executor_);
1277  const auto key_lvs = code_generator.codegen(key_col, true, co);
1278  CHECK_EQ(size_t(1), key_lvs.size());
1279  auto const& key_col_ti = key_col->get_type_info();
1280  auto hash_entry_info =
1282 
1283  std::vector<llvm::Value*> hash_join_idx_args{
1284  hash_ptr,
1285  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1286  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1287  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1288  if (shard_count) {
1289  const auto expected_hash_entry_count =
1291  const auto entry_count_per_shard =
1292  (expected_hash_entry_count + shard_count - 1) / shard_count;
1293  hash_join_idx_args.push_back(
1294  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1295  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1296  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1297  }
1298  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1299  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1300  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1301  inline_fixed_encoding_null_val(key_col_logical_ti)));
1302  }
1303  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1304  if (isBitwiseEq()) {
1305  if (special_date_bucketization_case) {
1306  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1307  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1308  } else {
1309  hash_join_idx_args.push_back(
1310  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1311  }
1312  }
1313 
1314  if (special_date_bucketization_case) {
1315  hash_join_idx_args.emplace_back(
1316  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1317  }
1318 
1319  return hash_join_idx_args;
1320 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const int device_count_
bool isBitwiseEq() const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:820
int64_t getIntMax() const
int64_t getIntMin() const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:55
Executor * executor_
ExpressionRange col_range_
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getHashType()

HashType JoinHashTable::getHashType ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 89 of file JoinHashTable.h.

References hash_type_.

89 { return hash_type_; }
HashType hash_type_

◆ getInnerQueryInfo()

const InputTableInfo & JoinHashTable::getInnerQueryInfo ( const Analyzer::ColumnVar inner_col) const
private

Definition at line 1588 of file JoinHashTable.cpp.

References get_inner_query_info(), and Analyzer::ColumnVar::get_table_id().

Referenced by JoinHashTable().

1589  {
1590  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1591 }
const std::vector< InputTableInfo > & query_infos_
int get_table_id() const
Definition: Analyzer.h:194
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getInnerTableId()

int JoinHashTable::getInnerTableId ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 81 of file JoinHashTable.h.

References col_var_.

81  {
82  return col_var_.get()->get_table_id();
83  };
std::shared_ptr< Analyzer::ColumnVar > col_var_

◆ getInnerTableRteIdx()

int JoinHashTable::getInnerTableRteIdx ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 85 of file JoinHashTable.h.

References col_var_.

85  {
86  return col_var_.get()->get_rte_idx();
87  };
std::shared_ptr< Analyzer::ColumnVar > col_var_

◆ getInstance()

std::shared_ptr< JoinHashTable > JoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from an in-flight SQL query's parse tree etc.

Definition at line 338 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), getExpressionRange(), HashEntryInfo::getNormalizedHashEntryCount(), Data_Namespace::GPU_LEVEL, Invalid, IS_EQUIVALENCE, kBW_EQ, ExpressionRange::makeIntRange(), VLOG, and VLOGGING.

Referenced by JoinHashTableInterface::getInstance().

345  {
346  decltype(std::chrono::steady_clock::now()) ts1, ts2;
347  if (VLOGGING(1)) {
348  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
349  << " for qual: " << qual_bin_oper->toString();
350  ts1 = std::chrono::steady_clock::now();
351  }
352  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
353  const auto cols =
354  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
355  const auto inner_col = cols.first;
356  CHECK(inner_col);
357  const auto& ti = inner_col->get_type_info();
358  auto col_range =
359  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
360  if (col_range.getType() == ExpressionRangeType::Invalid) {
361  throw HashJoinFail(
362  "Could not compute range for the expressions involved in the equijoin");
363  }
364  if (ti.is_string()) {
365  // The nullable info must be the same as the source column.
366  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
367  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
368  throw HashJoinFail(
369  "Could not compute range for the expressions involved in the equijoin");
370  }
371  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
372  // If the inner column expression range is empty, use the inner col range
373  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
374  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
375  col_range = source_col_range;
376  } else {
377  col_range = ExpressionRange::makeIntRange(
378  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
379  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
380  0,
381  source_col_range.hasNulls());
382  }
383  }
384  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
385  const auto max_hash_entry_count =
387  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
388  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
389 
390  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
391  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
392  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
393 
394  if (bucketized_entry_count > max_hash_entry_count) {
395  throw TooManyHashEntries();
396  }
397 
398  if (qual_bin_oper->get_optype() == kBW_EQ &&
399  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
400  throw HashJoinFail("Cannot translate null value for kBW_EQ");
401  }
402  auto join_hash_table =
403  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
404  inner_col,
405  query_infos,
406  memory_level,
407  preferred_hash_type,
408  col_range,
409  column_cache,
410  executor,
411  device_count));
412  try {
413  join_hash_table->reify();
414  } catch (const TableMustBeReplicated& e) {
415  // Throw a runtime error to abort the query
416  join_hash_table->freeHashBufferMemory();
417  throw std::runtime_error(e.what());
418  } catch (const HashJoinFail& e) {
419  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
420  // possible)
421  join_hash_table->freeHashBufferMemory();
422  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
423  "involved in equijoin | ") +
424  e.what());
425  } catch (const ColumnarConversionNotSupported& e) {
426  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
427  e.what());
428  } catch (const OutOfMemory& e) {
429  throw HashJoinFail(
430  std::string("Ran out of memory while building hash tables for equijoin | ") +
431  e.what());
432  } catch (const std::exception& e) {
433  throw std::runtime_error(
434  std::string("Fatal error while attempting to build hash tables for join: ") +
435  e.what());
436  }
437  if (VLOGGING(1)) {
438  ts2 = std::chrono::steady_clock::now();
439  VLOG(1) << "Built perfect hash table "
440  << getHashTypeString(join_hash_table->getHashType()) << " in "
441  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
442  << " ms";
443  }
444  return join_hash_table;
445 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
#define VLOGGING(n)
Definition: Logger.h:195
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define CHECK(condition)
Definition: Logger.h:197
Definition: sqldefs.h:31
static std::string getHashTypeString(HashType ht) noexcept
#define VLOG(n)
Definition: Logger.h:291
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getJoinHashBuffer()

int64_t JoinHashTable::getJoinHashBuffer ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1431 of file JoinHashTable.cpp.

References CHECK, CHECK_LT, and CPU.

1432  {
1433  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1434  return 0;
1435  }
1436 #ifdef HAVE_CUDA
1437  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1438  if (device_type == ExecutorDeviceType::CPU) {
1439  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1440  } else {
1441  return gpu_hash_table_buff_[device_id]
1442  ? reinterpret_cast<CUdeviceptr>(
1443  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1444  : reinterpret_cast<CUdeviceptr>(nullptr);
1445  }
1446 #else
1447  CHECK(device_type == ExecutorDeviceType::CPU);
1448  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1449 #endif
1450 }
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197

◆ getJoinHashBufferSize()

size_t JoinHashTable::getJoinHashBufferSize ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1452 of file JoinHashTable.cpp.

References CHECK, CHECK_LT, and CPU.

1453  {
1454  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1455  return 0;
1456  }
1457 #ifdef HAVE_CUDA
1458  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1459  if (device_type == ExecutorDeviceType::CPU) {
1460  return cpu_hash_table_buff_->size() *
1461  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1462  } else {
1463  return gpu_hash_table_buff_[device_id]
1464  ? gpu_hash_table_buff_[device_id]->reservedSize()
1465  : 0;
1466  }
1467 #else
1468  CHECK(device_type == ExecutorDeviceType::CPU);
1469  return cpu_hash_table_buff_->size() *
1470  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1471 #endif
1472 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK(condition)
Definition: Logger.h:197

◆ getMemoryLevel()

Data_Namespace::MemoryLevel JoinHashTable::getMemoryLevel ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 91 of file JoinHashTable.h.

References memory_level_.

91  {
92  return memory_level_;
93  };
const Data_Namespace::MemoryLevel memory_level_

◆ getNumberOfCachedHashTables()

static uint64_t JoinHashTable::getNumberOfCachedHashTables ( )
inlinestatic

Definition at line 130 of file JoinHashTable.h.

References join_hash_table_cache_, join_hash_table_cache_mutex_, and ~JoinHashTable().

Referenced by QueryRunner::QueryRunner::getNumberOfCachedJoinHashTables().

130  {
131  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
132  return join_hash_table_cache_.size();
133  }
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initHashTableOnCpuFromCache()

void JoinHashTable::initHashTableOnCpuFromCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols 
)
private

Definition at line 1184 of file JoinHashTable.cpp.

References CHECK_GE, and DEBUG_TIMER.

Referenced by JoinHashTable().

1187  {
1188  auto timer = DEBUG_TIMER(__func__);
1189  CHECK_GE(chunk_key.size(), size_t(2));
1190  if (chunk_key[1] < 0) {
1191  // Do not cache hash tables over intermediate results
1192  return;
1193  }
1194  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1195  JoinHashTableCacheKey cache_key{col_range_,
1196  *cols.first,
1197  outer_col ? *outer_col : *cols.first,
1198  num_elements,
1199  chunk_key,
1200  qual_bin_oper_->get_optype()};
1201  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1202  for (const auto& kv : join_hash_table_cache_) {
1203  if (kv.first == cache_key) {
1204  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1205  cpu_hash_table_buff_ = kv.second;
1206  break;
1207  }
1208  }
1209 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
#define CHECK_GE(x, y)
Definition: Logger.h:210
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ initOneToManyHashTable()

void JoinHashTable::initOneToManyHashTable ( const ChunkKey chunk_key,
const JoinColumn join_column,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 1054 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK, CHECK_EQ, CHECK_GT, copy_to_gpu(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, fill_one_to_many_hash_table_on_device(), fill_one_to_many_hash_table_on_device_bucketized(), fill_one_to_many_hash_table_on_device_sharded(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), get_shard_count(), Data_Namespace::GPU_LEVEL, init_hash_join_buff_on_device(), inline_fixed_encoding_null_val(), kDATE, and JoinColumn::num_elems.

Referenced by JoinHashTable().

1059  {
1060  auto timer = DEBUG_TIMER(__func__);
1061  auto const inner_col = cols.first;
1062  CHECK(inner_col);
1063 
1064  auto hash_entry_info = get_bucketized_hash_entry_info(
1065  inner_col->get_type_info(), col_range_, isBitwiseEq());
1066 
1067 #ifdef HAVE_CUDA
1068  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1069  const size_t entries_per_shard =
1070  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1071  : 0);
1072  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1073  // needed once the join hash table has been built on the CPU.
1074  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1075  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1076  CHECK_GT(shards_per_device, 0u);
1077  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1078  }
1079 #else
1080  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1081 #endif
1082  if (!device_id) {
1083  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1084  }
1085 
1086 #ifdef HAVE_CUDA
1087  const auto& ti = inner_col->get_type_info();
1088  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1090  const size_t total_count =
1091  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
1092  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1093  &data_mgr, total_count * sizeof(int32_t), device_id);
1094  }
1095 #endif
1096  const int32_t hash_join_invalid_val{-1};
1097  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1098  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
1099  {
1100  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1102  join_column, cols, hash_entry_info, hash_join_invalid_val);
1103  }
1104  if (inner_col->get_table_id() > 0) {
1105  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
1106  }
1107  // Transfer the hash table on the GPU if we've only built it on CPU
1108  // but the query runs on GPU (join on dictionary encoded columns).
1109  // Don't transfer the buffer if there was an error since we'll bail anyway.
1111 #ifdef HAVE_CUDA
1112  CHECK(ti.is_string());
1113  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1114  copy_to_gpu(
1115  &data_mgr,
1116  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1117  &(*cpu_hash_table_buff_)[0],
1118  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1119  device_id);
1120 #else
1121  CHECK(false);
1122 #endif
1123  }
1124  } else {
1125 #ifdef HAVE_CUDA
1126  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1127  data_mgr.getCudaMgr()->setContext(device_id);
1129  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1130  hash_entry_info.getNormalizedHashEntryCount(),
1131  hash_join_invalid_val,
1132  executor_->blockSize(),
1133  executor_->gridSize());
1134  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1138  isBitwiseEq(),
1139  col_range_.getIntMax() + 1,
1141  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1142 
1143  if (shard_count) {
1144  CHECK_GT(device_count_, 0);
1145  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1146  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1148  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1149  hash_entry_info,
1150  hash_join_invalid_val,
1151  join_column,
1152  type_info,
1153  shard_info,
1154  executor_->blockSize(),
1155  executor_->gridSize());
1156  }
1157  } else {
1158  if (use_bucketization) {
1160  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1161  hash_entry_info,
1162  hash_join_invalid_val,
1163  join_column,
1164  type_info,
1165  executor_->blockSize(),
1166  executor_->gridSize());
1167  } else {
1169  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1170  hash_entry_info,
1171  hash_join_invalid_val,
1172  join_column,
1173  type_info,
1174  executor_->blockSize(),
1175  executor_->gridSize());
1176  }
1177  }
1178 #else
1179  CHECK(false);
1180 #endif
1181  }
1182 }
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const int device_count_
size_t num_elems
bool isBitwiseEq() const
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:209
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
int64_t getIntMin() const
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:55
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initOneToManyHashTableOnCpu()

void JoinHashTable::initOneToManyHashTableOnCpu ( const JoinColumn join_column,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 825 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, CHECK_NE, cpu_threads(), DEBUG_TIMER, fill_one_to_many_hash_table(), fill_one_to_many_hash_table_bucketized(), get_join_column_type_kind(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), kDATE, kENCODING_DICT, and JoinColumn::num_elems.

Referenced by JoinHashTable().

829  {
830  auto timer = DEBUG_TIMER(__func__);
831  const auto inner_col = cols.first;
832  CHECK(inner_col);
833  const auto& ti = inner_col->get_type_info();
834  if (cpu_hash_table_buff_) {
835  return;
836  }
837  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
838  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems);
839  const StringDictionaryProxy* sd_inner_proxy{nullptr};
840  const StringDictionaryProxy* sd_outer_proxy{nullptr};
841  if (ti.is_string()) {
842  CHECK_EQ(kENCODING_DICT, ti.get_compression());
843  sd_inner_proxy = executor_->getStringDictionaryProxy(
844  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
845  CHECK(sd_inner_proxy);
846  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
847  CHECK(outer_col);
848  sd_outer_proxy = executor_->getStringDictionaryProxy(
849  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
850  CHECK(sd_outer_proxy);
851  }
852  int thread_count = cpu_threads();
853  std::vector<std::future<void>> init_threads;
854  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
855  init_threads.emplace_back(std::async(std::launch::async,
857  &(*cpu_hash_table_buff_)[0],
858  hash_entry_info.getNormalizedHashEntryCount(),
859  hash_join_invalid_val,
860  thread_idx,
861  thread_count));
862  }
863  for (auto& child : init_threads) {
864  child.wait();
865  }
866  for (auto& child : init_threads) {
867  child.get();
868  }
869 
870  if (ti.get_type() == kDATE) {
872  hash_entry_info,
873  hash_join_invalid_val,
874  join_column,
875  {static_cast<size_t>(ti.get_size()),
879  isBitwiseEq(),
880  col_range_.getIntMax() + 1,
882  sd_inner_proxy,
883  sd_outer_proxy,
884  thread_count);
885  } else {
887  hash_entry_info,
888  hash_join_invalid_val,
889  join_column,
890  {static_cast<size_t>(ti.get_size()),
894  isBitwiseEq(),
895  col_range_.getIntMax() + 1,
897  sd_inner_proxy,
898  sd_outer_proxy,
899  thread_count);
900  }
901 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t getNormalizedHashEntryCount() const
size_t num_elems
bool isBitwiseEq() const
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
int64_t getIntMin() const
Definition: sqltypes.h:55
Executor * executor_
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
ExpressionRange col_range_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initOneToOneHashTable()

void JoinHashTable::initOneToOneHashTable ( const ChunkKey chunk_key,
const JoinColumn join_column,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 915 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK, CHECK_EQ, CHECK_GT, copy_from_gpu(), copy_to_gpu(), Data_Namespace::CPU_LEVEL, DEBUG_TIMER, fill_hash_join_buff_on_device_bucketized(), fill_hash_join_buff_on_device_sharded_bucketized(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), Data_Namespace::GPU_LEVEL, init_hash_join_buff_on_device(), inline_fixed_encoding_null_val(), and JoinColumn::num_elems.

Referenced by JoinHashTable().

920  {
921  auto timer = DEBUG_TIMER(__func__);
922  const auto inner_col = cols.first;
923  CHECK(inner_col);
924 
925  auto hash_entry_info = get_bucketized_hash_entry_info(
926  inner_col->get_type_info(), col_range_, isBitwiseEq());
927  if (!hash_entry_info) {
928  return;
929  }
930 
931 #ifdef HAVE_CUDA
932  const auto shard_count = shardCount();
933  const size_t entries_per_shard{
934  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
935  : 0};
936  // Even if we join on dictionary encoded strings, the memory on the GPU is still
937  // needed once the join hash table has been built on the CPU.
938  const auto catalog = executor_->getCatalog();
940  auto& data_mgr = catalog->getDataMgr();
941  if (shard_count) {
942  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
943  CHECK_GT(shards_per_device, 0u);
944  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
945  }
946  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
947  &data_mgr,
948  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
949  device_id);
950  }
951 #else
952  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
953 #endif
954  if (!device_id) {
955  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
956  }
957 
958 #ifdef HAVE_CUDA
959  const auto& ti = inner_col->get_type_info();
960 #endif
961  const int32_t hash_join_invalid_val{-1};
962  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
963  CHECK(!chunk_key.empty());
964  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
965  {
966  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
968  join_column, cols, hash_entry_info, hash_join_invalid_val);
969  }
970  if (inner_col->get_table_id() > 0) {
971  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
972  }
973  // Transfer the hash table on the GPU if we've only built it on CPU
974  // but the query runs on GPU (join on dictionary encoded columns).
976 #ifdef HAVE_CUDA
977  CHECK(ti.is_string());
978  auto& data_mgr = catalog->getDataMgr();
979  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
980 
981  copy_to_gpu(
982  &data_mgr,
983  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
984  &(*cpu_hash_table_buff_)[0],
985  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
986  device_id);
987 #else
988  CHECK(false);
989 #endif
990  }
991  } else {
992 #ifdef HAVE_CUDA
993  int err{0};
994  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
995  auto& data_mgr = catalog->getDataMgr();
996  gpu_hash_table_err_buff_[device_id] =
997  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
998  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
999  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
1000  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1002  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1003  hash_entry_info.getNormalizedHashEntryCount(),
1004  hash_join_invalid_val,
1005  executor_->blockSize(),
1006  executor_->gridSize());
1007  if (chunk_key.empty()) {
1008  return;
1009  }
1010  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1014  isBitwiseEq(),
1015  col_range_.getIntMax() + 1,
1017  if (shard_count) {
1018  CHECK_GT(device_count_, 0);
1019  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1020  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1022  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1023  hash_join_invalid_val,
1024  reinterpret_cast<int*>(dev_err_buff),
1025  join_column,
1026  type_info,
1027  shard_info,
1028  executor_->blockSize(),
1029  executor_->gridSize(),
1030  hash_entry_info.bucket_normalization);
1031  }
1032  } else {
1034  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1035  hash_join_invalid_val,
1036  reinterpret_cast<int*>(dev_err_buff),
1037  join_column,
1038  type_info,
1039  executor_->blockSize(),
1040  executor_->gridSize(),
1041  hash_entry_info.bucket_normalization);
1042  }
1043  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1044 
1045  if (err) {
1046  throw NeedsOneToManyHash();
1047  }
1048 #else
1049  CHECK(false);
1050 #endif
1051  }
1052 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const int device_count_
size_t num_elems
bool isBitwiseEq() const
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:209
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
int64_t getIntMin() const
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t shardCount() const
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initOneToOneHashTableOnCpu()

void JoinHashTable::initOneToOneHashTableOnCpu ( const JoinColumn join_column,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 736 of file JoinHashTable.cpp.

References HashEntryInfo::bucket_normalization, CHECK, CHECK_EQ, cpu_threads(), DEBUG_TIMER, fill_hash_join_buff_bucketized(), get_join_column_type_kind(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), and kENCODING_DICT.

Referenced by JoinHashTable().

740  {
741  auto timer = DEBUG_TIMER(__func__);
742  const auto inner_col = cols.first;
743  CHECK(inner_col);
744  const auto& ti = inner_col->get_type_info();
745  if (!cpu_hash_table_buff_) {
746  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
747  hash_entry_info.getNormalizedHashEntryCount());
748  const StringDictionaryProxy* sd_inner_proxy{nullptr};
749  const StringDictionaryProxy* sd_outer_proxy{nullptr};
750  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
751  if (ti.is_string() &&
752  (outer_col && !(inner_col->get_comp_param() == outer_col->get_comp_param()))) {
753  CHECK_EQ(kENCODING_DICT, ti.get_compression());
754  sd_inner_proxy = executor_->getStringDictionaryProxy(
755  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
756  CHECK(sd_inner_proxy);
757  CHECK(outer_col);
758  sd_outer_proxy = executor_->getStringDictionaryProxy(
759  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
760  CHECK(sd_outer_proxy);
761  }
762  int thread_count = cpu_threads();
763  std::vector<std::thread> init_cpu_buff_threads;
764  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
765  init_cpu_buff_threads.emplace_back(
766  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
768  hash_entry_info.getNormalizedHashEntryCount(),
769  hash_join_invalid_val,
770  thread_idx,
771  thread_count);
772  });
773  }
774  for (auto& t : init_cpu_buff_threads) {
775  t.join();
776  }
777  init_cpu_buff_threads.clear();
778  int err{0};
779  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
780  init_cpu_buff_threads.emplace_back([this,
781  hash_join_invalid_val,
782  &join_column,
783  sd_inner_proxy,
784  sd_outer_proxy,
785  thread_idx,
786  thread_count,
787  &ti,
788  &err,
789  hash_entry_info] {
790  int partial_err =
792  hash_join_invalid_val,
793  join_column,
794  {static_cast<size_t>(ti.get_size()),
798  isBitwiseEq(),
799  col_range_.getIntMax() + 1,
801  sd_inner_proxy,
802  sd_outer_proxy,
803  thread_idx,
804  thread_count,
805  hash_entry_info.bucket_normalization);
806  __sync_val_compare_and_swap(&err, 0, partial_err);
807  });
808  }
809  for (auto& t : init_cpu_buff_threads) {
810  t.join();
811  }
812  if (err) {
813  cpu_hash_table_buff_.reset();
814  // Too many hash entries, need to retry with a 1:many table
815  throw NeedsOneToManyHash();
816  }
817  } else {
818  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
819  // Too many hash entries, need to retry with a 1:many table
820  throw NeedsOneToManyHash();
821  }
822  }
823 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t getNormalizedHashEntryCount() const
bool isBitwiseEq() const
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
int64_t getIntMin() const
int64_t bucket_normalization
Executor * executor_
ExpressionRange col_range_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ isBitwiseEq()

bool JoinHashTable::isBitwiseEq ( ) const
private

Definition at line 1629 of file JoinHashTable.cpp.

References kBW_EQ.

Referenced by JoinHashTable().

1629  {
1630  return qual_bin_oper_->get_optype() == kBW_EQ;
1631 }
Definition: sqldefs.h:31
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ layoutRequiresAdditionalBuffers()

bool JoinHashTable::layoutRequiresAdditionalBuffers ( JoinHashTableInterface::HashType  layout) const
inlineoverrideprivatevirtualnoexcept

◆ offsetBufferOff()

size_t JoinHashTable::offsetBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1411 of file JoinHashTable.cpp.

Referenced by getDeviceCount().

1411  {
1412  return 0;
1413 }
+ Here is the caller graph for this function:

◆ payloadBufferOff()

size_t JoinHashTable::payloadBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1419 of file JoinHashTable.cpp.

Referenced by getDeviceCount().

1419  {
1420  return 2 * getComponentBufferSize();
1421 }
size_t getComponentBufferSize() const noexcept
+ Here is the caller graph for this function:

◆ putHashTableOnCpuToCache()

void JoinHashTable::putHashTableOnCpuToCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols 
)
private

Definition at line 1211 of file JoinHashTable.cpp.

References CHECK_GE.

Referenced by JoinHashTable().

1214  {
1215  CHECK_GE(chunk_key.size(), size_t(2));
1216  if (chunk_key[1] < 0) {
1217  // Do not cache hash tables over intermediate results
1218  return;
1219  }
1220  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1221  JoinHashTableCacheKey cache_key{col_range_,
1222  *cols.first,
1223  outer_col ? *outer_col : *cols.first,
1224  num_elements,
1225  chunk_key,
1226  qual_bin_oper_->get_optype()};
1227  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1228  for (const auto& kv : join_hash_table_cache_) {
1229  if (kv.first == cache_key) {
1230  return;
1231  }
1232  }
1233  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1234 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
#define CHECK_GE(x, y)
Definition: Logger.h:210
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ reify()

void JoinHashTable::reify ( )
private

Definition at line 493 of file JoinHashTable.cpp.

References CHECK_LT, DEBUG_TIMER, anonymous_namespace{JoinHashTable.cpp}::get_cols(), JoinHashTableInterface::OneToMany, JoinHashTableInterface::OneToOne, only_shards_for_device(), reifyOneToManyForDevice(), reifyOneToOneForDevice(), and logger::thread_id().

Referenced by JoinHashTable().

493  {
494  auto timer = DEBUG_TIMER(__func__);
496  const auto& catalog = *executor_->getCatalog();
497  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
498  const auto inner_col = cols.first;
499  checkHashJoinReplicationConstraint(inner_col->get_table_id());
500  const auto& query_info = getInnerQueryInfo(inner_col).info;
501  if (query_info.fragments.empty()) {
502  return;
503  }
504  if (query_info.getNumTuplesUpperBound() >
505  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
506  throw TooManyHashEntries();
507  }
508 #ifdef HAVE_CUDA
509  gpu_hash_table_buff_.resize(device_count_);
510  gpu_hash_table_err_buff_.resize(device_count_);
511 #endif // HAVE_CUDA
512  std::vector<std::future<void>> init_threads;
513  const int shard_count = shardCount();
514 
515  try {
516  for (int device_id = 0; device_id < device_count_; ++device_id) {
517  const auto fragments =
518  shard_count
519  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
520  : query_info.fragments;
521  init_threads.push_back(
522  std::async(std::launch::async,
526  this,
527  fragments,
528  device_id,
529  logger::thread_id()));
530  }
531  for (auto& init_thread : init_threads) {
532  init_thread.wait();
533  }
534  for (auto& init_thread : init_threads) {
535  init_thread.get();
536  }
537 
538  } catch (const NeedsOneToManyHash& e) {
541  init_threads.clear();
542  for (int device_id = 0; device_id < device_count_; ++device_id) {
543  const auto fragments =
544  shard_count
545  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
546  : query_info.fragments;
547 
548  init_threads.push_back(std::async(std::launch::async,
549  &JoinHashTable::reifyOneToManyForDevice,
550  this,
551  fragments,
552  device_id,
553  logger::thread_id()));
554  }
555  for (auto& init_thread : init_threads) {
556  init_thread.wait();
557  }
558  for (auto& init_thread : init_threads) {
559  init_thread.get();
560  }
561  }
562 }
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
const int device_count_
void freeHashBufferMemory()
void checkHashJoinReplicationConstraint(const int table_id) const
HashType hash_type_
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
void reifyOneToOneForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
void reifyOneToManyForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
size_t shardCount() const
#define CHECK_LT(x, y)
Definition: Logger.h:207
Executor * executor_
ThreadId thread_id()
Definition: Logger.cpp:731
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reifyOneToManyForDevice()

void JoinHashTable::reifyOneToManyForDevice ( const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id,
const logger::ThreadId  parent_thread_id 
)
private

Definition at line 665 of file JoinHashTable.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, DEBUG_TIMER_NEW_THREAD, anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), Data_Namespace::GPU_LEVEL, and needs_dictionary_translation().

Referenced by JoinHashTable(), and reify().

668  {
669  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
670  const auto& catalog = *executor_->getCatalog();
671  auto& data_mgr = catalog.getDataMgr();
672  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
673  const auto inner_col = cols.first;
674  CHECK(inner_col);
675  const auto inner_cd = get_column_descriptor_maybe(
676  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
677  if (inner_cd && inner_cd->isVirtualCol) {
679  }
680  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
681  // Since we don't have the string dictionary payloads on the GPU, we'll build
682  // the join hash table on the CPU and transfer it to the GPU.
683  const auto effective_memory_level =
684  needs_dictionary_translation(inner_col, cols.second, executor_)
686  : memory_level_;
687  if (fragments.empty()) {
688  ChunkKey empty_chunk;
689  initOneToManyHashTable(empty_chunk,
690  JoinColumn{nullptr, 0, 0, 0, 0},
691  cols,
692  effective_memory_level,
693  device_id);
694  return;
695  }
696 
697  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
698  std::unique_ptr<DeviceAllocator> device_allocator;
699  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
700  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
701  }
702  std::vector<std::shared_ptr<void>> malloc_owner;
703 
704  JoinColumn join_column = fetchJoinColumn(inner_col,
705  fragments,
706  effective_memory_level,
707  device_id,
708  chunks_owner,
709  device_allocator.get(),
710  malloc_owner,
711  executor_,
712  &column_cache_);
713 
714  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
715  join_column,
716  cols,
717  effective_memory_level,
718  device_id);
719 }
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
ColumnCacheMap & column_cache_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
Executor * executor_
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
void initOneToManyHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reifyOneToOneForDevice()

void JoinHashTable::reifyOneToOneForDevice ( const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id,
const logger::ThreadId  parent_thread_id 
)
private

Definition at line 607 of file JoinHashTable.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, DEBUG_TIMER_NEW_THREAD, anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), Data_Namespace::GPU_LEVEL, and needs_dictionary_translation().

Referenced by JoinHashTable(), and reify().

610  {
611  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
612  const auto& catalog = *executor_->getCatalog();
613  auto& data_mgr = catalog.getDataMgr();
614  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
615  const auto inner_col = cols.first;
616  CHECK(inner_col);
617  const auto inner_cd = get_column_descriptor_maybe(
618  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
619  if (inner_cd && inner_cd->isVirtualCol) {
621  }
622  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
623  // Since we don't have the string dictionary payloads on the GPU, we'll build
624  // the join hash table on the CPU and transfer it to the GPU.
625  const auto effective_memory_level =
626  needs_dictionary_translation(inner_col, cols.second, executor_)
628  : memory_level_;
629  if (fragments.empty()) {
630  // No data in this fragment. Still need to create a hash table and initialize it
631  // properly.
632  ChunkKey empty_chunk;
633  initOneToOneHashTable(empty_chunk,
634  JoinColumn{nullptr, 0, 0, 0, 0},
635  cols,
636  effective_memory_level,
637  device_id);
638  return;
639  }
640 
641  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
642  std::unique_ptr<DeviceAllocator> device_allocator;
643  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
644  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
645  }
646  std::vector<std::shared_ptr<void>> malloc_owner;
647 
648  JoinColumn join_column = fetchJoinColumn(inner_col,
649  fragments,
650  effective_memory_level,
651  device_id,
652  chunks_owner,
653  device_allocator.get(),
654  malloc_owner,
655  executor_,
656  &column_cache_);
657 
658  initOneToOneHashTable(genHashTableKey(fragments, cols.second, inner_col),
659  join_column,
660  cols,
661  effective_memory_level,
662  device_id);
663 }
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
ColumnCacheMap & column_cache_
void initOneToOneHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
Executor * executor_
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ shardCount()

size_t JoinHashTable::shardCount ( ) const
private

Definition at line 1623 of file JoinHashTable.cpp.

References get_shard_count(), and Data_Namespace::GPU_LEVEL.

Referenced by JoinHashTable().

1623  {
1626  : 0;
1627 }
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ toSet()

std::set< DecodedJoinHashBufferEntry > JoinHashTable::toSet ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1510 of file JoinHashTable.cpp.

References copy_from_gpu(), GPU, and JoinHashTableInterface::toSet().

1512  {
1513  auto buffer = getJoinHashBuffer(device_type, device_id);
1514  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1515 #ifdef HAVE_CUDA
1516  std::unique_ptr<int8_t[]> buffer_copy;
1517  if (device_type == ExecutorDeviceType::GPU) {
1518  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1519 
1520  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1521  buffer_copy.get(),
1522  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1523  buffer_size,
1524  device_id);
1525  }
1526  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1527 #else
1528  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1529 #endif // HAVE_CUDA
1530  auto ptr2 = ptr1 + offsetBufferOff();
1531  auto ptr3 = ptr1 + countBufferOff();
1532  auto ptr4 = ptr1 + payloadBufferOff();
1534  0, 0, hash_entry_count_, ptr1, ptr2, ptr3, ptr4, buffer_size);
1535 }
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t countBufferOff() const noexcept override
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t payloadBufferOff() const noexcept override
Executor * executor_
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexcept override
size_t offsetBufferOff() const noexcept override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexcept override
size_t hash_entry_count_
+ Here is the call graph for this function:

◆ toString()

std::string JoinHashTable::toString ( const ExecutorDeviceType  device_type,
const int  device_id = 0,
bool  raw = false 
) const
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1474 of file JoinHashTable.cpp.

References copy_from_gpu(), GPU, and JoinHashTableInterface::toString().

1476  {
1477  auto buffer = getJoinHashBuffer(device_type, device_id);
1478  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1479 #ifdef HAVE_CUDA
1480  std::unique_ptr<int8_t[]> buffer_copy;
1481  if (device_type == ExecutorDeviceType::GPU) {
1482  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1483 
1484  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1485  buffer_copy.get(),
1486  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1487  buffer_size,
1488  device_id);
1489  }
1490  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1491 #else
1492  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1493 #endif // HAVE_CUDA
1494  auto ptr2 = ptr1 + offsetBufferOff();
1495  auto ptr3 = ptr1 + countBufferOff();
1496  auto ptr4 = ptr1 + payloadBufferOff();
1497  return JoinHashTableInterface::toString("perfect",
1499  0,
1500  0,
1502  ptr1,
1503  ptr2,
1504  ptr3,
1505  ptr4,
1506  buffer_size,
1507  raw);
1508 }
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t countBufferOff() const noexcept override
HashType hash_type_
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t payloadBufferOff() const noexcept override
Executor * executor_
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexcept override
size_t offsetBufferOff() const noexcept override
static std::string getHashTypeString(HashType ht) noexcept
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexcept override
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
size_t hash_entry_count_
+ Here is the call graph for this function:

◆ yieldCacheInvalidator()

static auto JoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 114 of file JoinHashTable.h.

References join_hash_table_cache_, join_hash_table_cache_mutex_, and VLOG.

114  {
115  VLOG(1) << "Invalidate " << join_hash_table_cache_.size()
116  << " cached baseline hashtable.";
117  return []() -> void {
118  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
119  join_hash_table_cache_.clear();
120  };
121  }
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
#define VLOG(n)
Definition: Logger.h:291

Member Data Documentation

◆ col_range_

ExpressionRange JoinHashTable::col_range_
private

Definition at line 244 of file JoinHashTable.h.

◆ col_var_

std::shared_ptr<Analyzer::ColumnVar> JoinHashTable::col_var_
private

Definition at line 233 of file JoinHashTable.h.

Referenced by getInnerTableId(), and getInnerTableRteIdx().

◆ column_cache_

ColumnCacheMap& JoinHashTable::column_cache_
private

Definition at line 246 of file JoinHashTable.h.

◆ cpu_hash_table_buff_

std::shared_ptr<std::vector<int32_t> > JoinHashTable::cpu_hash_table_buff_
private

Definition at line 238 of file JoinHashTable.h.

◆ cpu_hash_table_buff_mutex_

std::mutex JoinHashTable::cpu_hash_table_buff_mutex_
private

Definition at line 239 of file JoinHashTable.h.

◆ device_count_

const int JoinHashTable::device_count_
private

Definition at line 247 of file JoinHashTable.h.

Referenced by getDeviceCount(), and JoinHashTable().

◆ executor_

Executor* JoinHashTable::executor_
private

Definition at line 245 of file JoinHashTable.h.

◆ hash_entry_count_

size_t JoinHashTable::hash_entry_count_
private

Definition at line 237 of file JoinHashTable.h.

◆ hash_type_

HashType JoinHashTable::hash_type_
private

Definition at line 236 of file JoinHashTable.h.

Referenced by getHashType().

◆ join_hash_table_cache_

std::vector< std::pair< JoinHashTable::JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > JoinHashTable::join_hash_table_cache_
staticprivate

◆ join_hash_table_cache_mutex_

std::mutex JoinHashTable::join_hash_table_cache_mutex_
staticprivate

◆ memory_level_

const Data_Namespace::MemoryLevel JoinHashTable::memory_level_
private

Definition at line 235 of file JoinHashTable.h.

Referenced by getMemoryLevel().

◆ qual_bin_oper_

std::shared_ptr<Analyzer::BinOper> JoinHashTable::qual_bin_oper_
private

Definition at line 230 of file JoinHashTable.h.

◆ query_infos_

const std::vector<InputTableInfo>& JoinHashTable::query_infos_
private

Definition at line 234 of file JoinHashTable.h.


The documentation for this class was generated from the following files: