OmniSciDB  04ee39c94c
JoinHashTable Class Reference

#include <JoinHashTable.h>

+ Inheritance diagram for JoinHashTable:
+ Collaboration diagram for JoinHashTable:

Classes

struct  JoinHashTableCacheKey
 

Public Member Functions

int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) noexcept override
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexcept override
 
int getInnerTableRteIdx () const noexcept override
 
HashType getHashType () const noexcept override
 
size_t offsetBufferOff () const noexcept override
 
size_t countBufferOff () const noexcept override
 
size_t payloadBufferOff () const noexcept override
 
virtual ~JoinHashTable ()
 

Static Public Member Functions

static std::shared_ptr< JoinHashTablegetInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 
static HashJoinMatchingSet codegenMatchingSet (const std::vector< llvm::Value *> &hash_join_idx_args_in, const bool is_sharded, const bool col_is_nullable, const bool is_bw_eq, const int64_t sub_buff_size, Executor *executor, const bool is_bucketized=false)
 
static llvm::Value * codegenHashTableLoad (const size_t table_idx, Executor *executor)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 

Private Member Functions

 JoinHashTable (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
 
std::pair< const int8_t *, size_t > getOneColumnFragment (const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
std::pair< const int8_t *, size_t > getAllColumnFragments (const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
ChunkKey genHashTableKey (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
 
void reify (const int device_count)
 
void reifyOneToOneForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
 
void reifyOneToManyForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
void initHashTableForDevice (const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initOneToManyHashTable (const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initHashTableOnCpuFromCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
 
void putHashTableOnCpuToCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
 
void initHashTableOnCpu (const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
void initOneToManyHashTableOnCpu (const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
const InputTableInfogetInnerQueryInfo (const Analyzer::ColumnVar *inner_col) const
 
size_t shardCount () const
 
llvm::Value * codegenHashTableLoad (const size_t table_idx)
 
std::vector< llvm::Value * > getHashJoinArgs (llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
 
std::pair< const int8_t *, size_t > fetchFragments (const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
size_t getComponentBufferSize () const noexcept
 

Private Attributes

std::shared_ptr< Analyzer::BinOperqual_bin_oper_
 
std::shared_ptr< Analyzer::ColumnVarcol_var_
 
const std::vector< InputTableInfo > & query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
HashType hash_type_
 
size_t hash_entry_count_
 
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
ExpressionRange col_range_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
const int device_count_
 
std::pair< const int8_t *, size_t > linearized_multifrag_column_
 
std::mutex linearized_multifrag_column_mutex_
 
RowSetMemoryOwner linearized_multifrag_column_owner_
 

Static Private Attributes

static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
 
static std::mutex join_hash_table_cache_mutex_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType { HashType::OneToOne, HashType::OneToMany }
 

Detailed Description

Definition at line 52 of file JoinHashTable.h.

Constructor & Destructor Documentation

◆ ~JoinHashTable()

virtual JoinHashTable::~JoinHashTable ( )
inlinevirtual

Definition at line 123 of file JoinHashTable.h.

123 {}

◆ JoinHashTable()

JoinHashTable::JoinHashTable ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const Analyzer::ColumnVar col_var,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const ExpressionRange col_range,
ColumnCacheMap column_cache,
Executor executor,
const int  device_count 
)
inlineprivate

Definition at line 126 of file JoinHashTable.h.

References CHECK, checkHashJoinReplicationConstraint(), codegenHashTableLoad(), fetchFragments(), freeHashBufferCpuMemory(), freeHashBufferGpuMemory(), freeHashBufferMemory(), genHashTableKey(), getAllColumnFragments(), getComponentBufferSize(), getHashJoinArgs(), getInnerQueryInfo(), getOneColumnFragment(), ExpressionRange::getType(), initHashTableForDevice(), initHashTableOnCpu(), initHashTableOnCpuFromCache(), initOneToManyHashTable(), initOneToManyHashTableOnCpu(), Integer, isBitwiseEq(), putHashTableOnCpuToCache(), reify(), reifyOneToManyForDevice(), reifyOneToOneForDevice(), and shardCount().

135  : qual_bin_oper_(qual_bin_oper)
136  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
137  , query_infos_(query_infos)
138  , memory_level_(memory_level)
139  , hash_type_(preferred_hash_type)
140  , hash_entry_count_(0)
141  , col_range_(col_range)
142  , executor_(executor)
143  , column_cache_(column_cache)
144  , device_count_(device_count) {
146  }
const std::vector< InputTableInfo > & query_infos_
const int device_count_
ColumnCacheMap & column_cache_
std::shared_ptr< Analyzer::Expr > deep_copy() const override
Definition: Analyzer.cpp:59
HashType hash_type_
ExpressionRangeType getType() const
Executor * executor_
ExpressionRange col_range_
#define CHECK(condition)
Definition: Logger.h:187
std::shared_ptr< Analyzer::ColumnVar > col_var_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_
+ Here is the call graph for this function:

Member Function Documentation

◆ checkHashJoinReplicationConstraint()

void JoinHashTable::checkHashJoinReplicationConstraint ( const int  table_id) const
private

Definition at line 711 of file JoinHashTable.cpp.

References CHECK, g_cluster, get_shard_count(), and table_is_replicated().

Referenced by JoinHashTable().

711  {
712  if (!g_cluster) {
713  return;
714  }
715  if (table_id >= 0) {
716  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
717  CHECK(inner_td);
718  size_t shard_count{0};
719  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
720  if (!shard_count && !table_is_replicated(inner_td)) {
721  throw TableMustBeReplicated(inner_td->tableName);
722  }
723  }
724 }
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Executor * executor_
bool table_is_replicated(const TableDescriptor *td)
#define CHECK(condition)
Definition: Logger.h:187
bool g_cluster
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenHashTableLoad() [1/2]

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx,
Executor executor 
)
static

Definition at line 1225 of file JoinHashTable.cpp.

References CHECK, CHECK_LT, and get_arg_by_name().

Referenced by BaselineJoinHashTable::codegenMatchingSet(), getHashType(), BaselineJoinHashTable::hashPtr(), and JoinHashTable().

1226  {
1227  llvm::Value* hash_ptr = nullptr;
1228  const auto total_table_count =
1229  executor->plan_state_->join_info_.join_hash_tables_.size();
1230  CHECK_LT(table_idx, total_table_count);
1231  if (total_table_count > 1) {
1232  auto hash_tables_ptr =
1233  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1234  auto hash_pptr =
1235  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1236  hash_tables_ptr,
1237  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1238  : hash_tables_ptr;
1239  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1240  } else {
1241  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1242  }
1243  CHECK(hash_ptr);
1244  return hash_ptr;
1245 }
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:114
#define CHECK_LT(x, y)
Definition: Logger.h:197
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenHashTableLoad() [2/2]

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx)
private

Definition at line 1214 of file JoinHashTable.cpp.

References CHECK, and get_arg_by_name().

1214  {
1215  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1216  if (hash_ptr->getType()->isIntegerTy(64)) {
1217  return hash_ptr;
1218  }
1219  CHECK(hash_ptr->getType()->isPointerTy());
1220  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1221  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1222  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1223 }
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:114
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ codegenMatchingSet() [1/2]

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1297 of file JoinHashTable.cpp.

References CHECK, anonymous_namespace{JoinHashTable.cpp}::get_cols(), and kDATE.

Referenced by BaselineJoinHashTable::codegenMatchingSet(), getHashType(), and getJoinHashBuffer().

1298  {
1299  const auto cols = get_cols(
1300  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1301  auto key_col = cols.second;
1302  CHECK(key_col);
1303  auto val_col = cols.first;
1304  CHECK(val_col);
1305  auto pos_ptr = codegenHashTableLoad(index);
1306  CHECK(pos_ptr);
1307  const int shard_count = shardCount();
1308  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1309  const int64_t sub_buff_size = getComponentBufferSize();
1310  const auto& key_col_ti = key_col->get_type_info();
1311 
1312  auto bucketize = (key_col_ti.get_type() == kDATE);
1313  return codegenMatchingSet(hash_join_idx_args,
1314  shard_count,
1315  !key_col_ti.get_notnull(),
1316  isBitwiseEq(),
1317  sub_buff_size,
1318  executor_,
1319  bucketize);
1320 }
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
bool isBitwiseEq() const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t shardCount() const
Definition: sqltypes.h:55
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
size_t getComponentBufferSize() const noexcept
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
#define CHECK(condition)
Definition: Logger.h:187
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenMatchingSet() [2/2]

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const std::vector< llvm::Value *> &  hash_join_idx_args_in,
const bool  is_sharded,
const bool  col_is_nullable,
const bool  is_bw_eq,
const int64_t  sub_buff_size,
Executor executor,
const bool  is_bucketized = false 
)
static

Definition at line 1322 of file JoinHashTable.cpp.

References CHECK.

1329  {
1330  using namespace std::string_literals;
1331 
1332  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1333 
1334  if (is_bw_eq) {
1335  fname += "_bitwise";
1336  }
1337  if (is_sharded) {
1338  fname += "_sharded";
1339  }
1340  if (!is_bw_eq && col_is_nullable) {
1341  fname += "_nullable";
1342  }
1343 
1344  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1345  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1346  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1347 
1348  auto pos_ptr = hash_join_idx_args_in[0];
1349  CHECK(pos_ptr);
1350 
1351  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1352  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1353  auto hash_join_idx_args = hash_join_idx_args_in;
1354  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1355  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1356 
1357  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1358  slot_valid_lv,
1359  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1360  executor->cgen_state_->llInt(int64_t(0)));
1361  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1362  executor->cgen_state_->ir_builder_.CreateAdd(
1363  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1364  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1365  auto rowid_ptr_i32 =
1366  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1367  return {rowid_ptr_i32, row_count_lv, slot_lv};
1368 }
#define CHECK(condition)
Definition: Logger.h:187

◆ codegenSlot()

llvm::Value * JoinHashTable::codegenSlot ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1389 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, CodeGenerator::codegen(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), kDATE, and JoinHashTableInterface::OneToOne.

Referenced by getJoinHashBuffer().

1390  {
1391  using namespace std::string_literals;
1392 
1394  const auto cols = get_cols(
1395  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1396  auto key_col = cols.second;
1397  CHECK(key_col);
1398  auto val_col = cols.first;
1399  CHECK(val_col);
1400  CodeGenerator code_generator(executor_);
1401  const auto key_lvs = code_generator.codegen(key_col, true, co);
1402  CHECK_EQ(size_t(1), key_lvs.size());
1403  auto hash_ptr = codegenHashTableLoad(index);
1404  CHECK(hash_ptr);
1405  const int shard_count = shardCount();
1406  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1407 
1408  const auto& key_col_ti = key_col->get_type_info();
1409  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1410  : "hash_join_idx"s);
1411 
1412  if (isBitwiseEq()) {
1413  fname += "_bitwise";
1414  }
1415  if (shard_count) {
1416  fname += "_sharded";
1417  }
1418 
1419  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1420  fname += "_nullable";
1421  }
1422  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1423 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
bool isBitwiseEq() const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t shardCount() const
Definition: sqltypes.h:55
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
#define CHECK(condition)
Definition: Logger.h:187
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
HashType getHashType() const noexcept override
Definition: JoinHashTable.h:97
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ countBufferOff()

size_t JoinHashTable::countBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1375 of file JoinHashTable.cpp.

References CHECK, and JoinHashTableInterface::OneToMany.

Referenced by getHashType().

1375  {
1377  return getComponentBufferSize();
1378 }
HashType hash_type_
size_t getComponentBufferSize() const noexcept
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ fetchFragments()

std::pair< const int8_t *, size_t > JoinHashTable::fetchFragments ( const Analyzer::ColumnVar hash_col,
const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragment_info,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ThrustAllocator dev_buff_owner 
)
private

Definition at line 548 of file JoinHashTable.cpp.

References ThrustAllocator::allocate(), CHECK_NE, copy_to_gpu(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), Analyzer::Expr::get_type_info(), and Data_Namespace::GPU_LEVEL.

Referenced by JoinHashTable().

554  {
555  static std::mutex fragment_fetch_mutex;
556  const bool has_multi_frag = fragment_info.size() > 1;
557  const auto& catalog = *executor_->getCatalog();
558  auto& data_mgr = catalog.getDataMgr();
559  const auto& first_frag = fragment_info.front();
560  const int8_t* col_buff = nullptr;
561  size_t elem_count = 0;
562 
563  const size_t elem_width = hash_col->get_type_info().get_size();
564  if (has_multi_frag) {
565  std::tie(col_buff, elem_count) =
566  getAllColumnFragments(*hash_col, fragment_info, chunks_owner);
567  }
568 
569  {
570  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
571  if (has_multi_frag) {
572  if (effective_memory_level == Data_Namespace::GPU_LEVEL && col_buff) {
573  CHECK_NE(elem_count, size_t(0));
574  int8_t* dev_col_buff = nullptr;
575  dev_col_buff = dev_buff_owner.allocate(elem_count * elem_width);
576  copy_to_gpu(&data_mgr,
577  reinterpret_cast<CUdeviceptr>(dev_col_buff),
578  col_buff,
579  elem_count * elem_width,
580  device_id);
581  col_buff = dev_col_buff;
582  }
583  } else {
584  std::tie(col_buff, elem_count) = getOneColumnFragment(
585  *hash_col, first_frag, effective_memory_level, device_id, chunks_owner);
586  }
587  }
588  return {col_buff, elem_count};
589 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
int8_t * allocate(std::ptrdiff_t num_bytes)
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
#define CHECK_NE(x, y)
Definition: Logger.h:196
Executor * executor_
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freeHashBufferCpuMemory()

void JoinHashTable::freeHashBufferCpuMemory ( )
private

Definition at line 1498 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1498  {
1499  cpu_hash_table_buff_.reset();
1500 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
+ Here is the caller graph for this function:

◆ freeHashBufferGpuMemory()

void JoinHashTable::freeHashBufferGpuMemory ( )
private

Definition at line 1477 of file JoinHashTable.cpp.

References CHECK, and CudaAllocator::freeGpuAbstractBuffer().

Referenced by JoinHashTable().

1477  {
1478 #ifdef HAVE_CUDA
1479  const auto& catalog = *executor_->getCatalog();
1480  auto& data_mgr = catalog.getDataMgr();
1481  for (auto& buf : gpu_hash_table_buff_) {
1482  if (buf) {
1483  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1484  buf = nullptr;
1485  }
1486  }
1487  for (auto& buf : gpu_hash_table_err_buff_) {
1488  if (buf) {
1489  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1490  buf = nullptr;
1491  }
1492  }
1493 #else
1494  CHECK(false);
1495 #endif // HAVE_CUDA
1496 }
Executor * executor_
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ freeHashBufferMemory()

void JoinHashTable::freeHashBufferMemory ( )
private

Definition at line 1470 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1470  {
1471 #ifdef HAVE_CUDA
1473 #endif
1475 }
void freeHashBufferGpuMemory()
void freeHashBufferCpuMemory()
+ Here is the caller graph for this function:

◆ genHashTableKey()

ChunkKey JoinHashTable::genHashTableKey ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Analyzer::Expr outer_col,
const Analyzer::ColumnVar inner_col 
) const
private

Definition at line 591 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), Analyzer::Expr::get_type_info(), and kENCODING_DICT.

Referenced by JoinHashTable().

594  {
595  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
596  inner_col->get_table_id(),
597  inner_col->get_column_id()};
598  const auto& ti = inner_col->get_type_info();
599  if (ti.is_string()) {
600  CHECK_EQ(kENCODING_DICT, ti.get_compression());
601  size_t outer_elem_count = 0;
602  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
603  CHECK(outer_col);
604  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
605  for (auto& frag : outer_query_info.fragments) {
606  outer_elem_count = frag.getNumTuples();
607  }
608  hash_table_key.push_back(outer_elem_count);
609  }
610  if (fragments.size() < 2) {
611  hash_table_key.push_back(fragments.front().fragmentId);
612  }
613  return hash_table_key;
614 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
int get_column_id() const
Definition: Analyzer.h:194
Executor * executor_
int get_table_id() const
Definition: Analyzer.h:193
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getAllColumnFragments()

std::pair< const int8_t *, size_t > JoinHashTable::getAllColumnFragments ( const Analyzer::ColumnVar hash_col,
const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)
private

Definition at line 414 of file JoinHashTable.cpp.

References ColumnFetcher::getAllColumnFragments().

Referenced by JoinHashTable().

417  {
418  std::lock_guard<std::mutex> linearized_multifrag_column_lock(
420  if (linearized_multifrag_column_.first) {
422  }
423  const int8_t* col_buff;
424  size_t total_elem_count;
425  std::tie(col_buff, total_elem_count) = ColumnFetcher::getAllColumnFragments(
426  executor_, hash_col, fragments, chunks_owner, column_cache_);
428  if (!shardCount()) {
429  linearized_multifrag_column_ = {col_buff, total_elem_count};
430  }
431  return {col_buff, total_elem_count};
432 }
std::pair< const int8_t *, size_t > linearized_multifrag_column_
ColumnCacheMap & column_cache_
std::mutex linearized_multifrag_column_mutex_
size_t shardCount() const
Executor * executor_
RowSetMemoryOwner linearized_multifrag_column_owner_
static std::pair< const int8_t *, size_t > getAllColumnFragments(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
void addColBuffer(const void *col_buffer)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getComponentBufferSize()

size_t JoinHashTable::getComponentBufferSize ( ) const
privatenoexcept

Definition at line 1385 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1385  {
1386  return hash_entry_count_ * sizeof(int32_t);
1387 }
size_t hash_entry_count_
+ Here is the caller graph for this function:

◆ getHashJoinArgs()

std::vector< llvm::Value * > JoinHashTable::getHashJoinArgs ( llvm::Value *  hash_ptr,
const Analyzer::Expr key_col,
const int  shard_count,
const CompilationOptions co 
)
private

Definition at line 1247 of file JoinHashTable.cpp.

References CHECK_EQ, CodeGenerator::codegen(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_hash_entry_count(), get_logical_type_info(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Analyzer::Expr::get_type_info(), inline_fixed_encoding_null_val(), and kDATE.

Referenced by JoinHashTable().

1250  {
1251  CodeGenerator code_generator(executor_);
1252  const auto key_lvs = code_generator.codegen(key_col, true, co);
1253  CHECK_EQ(size_t(1), key_lvs.size());
1254  auto const& key_col_ti = key_col->get_type_info();
1255  auto hash_entry_info =
1257 
1258  std::vector<llvm::Value*> hash_join_idx_args{
1259  hash_ptr,
1260  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1261  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1262  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1263  if (shard_count) {
1264  const auto expected_hash_entry_count =
1266  const auto entry_count_per_shard =
1267  (expected_hash_entry_count + shard_count - 1) / shard_count;
1268  hash_join_idx_args.push_back(
1269  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1270  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1271  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1272  }
1273  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1274  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1275  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1276  inline_fixed_encoding_null_val(key_col_logical_ti)));
1277  }
1278  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1279  if (isBitwiseEq()) {
1280  if (special_date_bucketization_case) {
1281  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1282  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1283  } else {
1284  hash_join_idx_args.push_back(
1285  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1286  }
1287  }
1288 
1289  if (special_date_bucketization_case) {
1290  hash_join_idx_args.emplace_back(
1291  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1292  }
1293 
1294  return hash_join_idx_args;
1295 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
const int device_count_
bool isBitwiseEq() const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
int64_t getIntMax() const
int64_t getIntMin() const
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:55
Executor * executor_
ExpressionRange col_range_
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getHashType()

HashType JoinHashTable::getHashType ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 97 of file JoinHashTable.h.

References codegenHashTableLoad(), codegenMatchingSet(), countBufferOff(), hash_type_, offsetBufferOff(), and payloadBufferOff().

97 { return hash_type_; }
HashType hash_type_
+ Here is the call graph for this function:

◆ getInnerQueryInfo()

const InputTableInfo & JoinHashTable::getInnerQueryInfo ( const Analyzer::ColumnVar inner_col) const
private

Definition at line 1425 of file JoinHashTable.cpp.

References get_inner_query_info(), and Analyzer::ColumnVar::get_table_id().

Referenced by JoinHashTable().

1426  {
1427  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1428 }
const std::vector< InputTableInfo > & query_infos_
int get_table_id() const
Definition: Analyzer.h:193
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getInnerTableId()

int JoinHashTable::getInnerTableId ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 89 of file JoinHashTable.h.

References col_var_.

89  {
90  return col_var_.get()->get_table_id();
91  };
std::shared_ptr< Analyzer::ColumnVar > col_var_

◆ getInnerTableRteIdx()

int JoinHashTable::getInnerTableRteIdx ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 93 of file JoinHashTable.h.

References col_var_.

93  {
94  return col_var_.get()->get_rte_idx();
95  };
std::shared_ptr< Analyzer::ColumnVar > col_var_

◆ getInstance()

std::shared_ptr< JoinHashTable > JoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Definition at line 303 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), getExpressionRange(), HashEntryInfo::getNormalizedHashEntryCount(), Data_Namespace::GPU_LEVEL, Invalid, IS_EQUIVALENCE, kBW_EQ, and ExpressionRange::makeIntRange().

Referenced by Executor::buildHashTableForQualifier().

310  {
311  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
312  const auto cols =
313  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
314  const auto inner_col = cols.first;
315  CHECK(inner_col);
316  const auto& ti = inner_col->get_type_info();
317  auto col_range =
318  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
319  if (col_range.getType() == ExpressionRangeType::Invalid) {
320  throw HashJoinFail(
321  "Could not compute range for the expressions involved in the equijoin");
322  }
323  if (ti.is_string()) {
324  // The nullable info must be the same as the source column.
325  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
326  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
327  throw HashJoinFail(
328  "Could not compute range for the expressions involved in the equijoin");
329  }
330  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
331  // If the inner column expression range is empty, use the inner col range
332  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
333  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
334  col_range = source_col_range;
335  } else {
336  col_range = ExpressionRange::makeIntRange(
337  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
338  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
339  0,
340  source_col_range.hasNulls());
341  }
342  }
343  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
344  const auto max_hash_entry_count =
346  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
347  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
348 
349  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
350  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
351  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
352 
353  if (bucketized_entry_count > max_hash_entry_count) {
354  throw TooManyHashEntries();
355  }
356 
357  if (qual_bin_oper->get_optype() == kBW_EQ &&
358  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
359  throw HashJoinFail("Cannot translate null value for kBW_EQ");
360  }
361  auto join_hash_table =
362  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
363  inner_col,
364  query_infos,
365  memory_level,
366  preferred_hash_type,
367  col_range,
368  column_cache,
369  executor,
370  device_count));
371  try {
372  join_hash_table->reify(device_count);
373  } catch (const TableMustBeReplicated& e) {
374  // Throw a runtime error to abort the query
375  join_hash_table->freeHashBufferMemory();
376  throw std::runtime_error(e.what());
377  } catch (const HashJoinFail& e) {
378  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
379  // possible)
380  join_hash_table->freeHashBufferMemory();
381  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
382  "involved in equijoin | ") +
383  e.what());
384  } catch (const ColumnarConversionNotSupported& e) {
385  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
386  e.what());
387  } catch (const OutOfMemory& e) {
388  throw HashJoinFail(
389  std::string("Ran out of memory while building hash tables for equijoin | ") +
390  e.what());
391  } catch (const std::exception& e) {
392  throw std::runtime_error(
393  std::string("Fatal error while attempting to build hash tables for join: ") +
394  e.what());
395  }
396  return join_hash_table;
397 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define CHECK(condition)
Definition: Logger.h:187
Definition: sqldefs.h:31
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getJoinHashBuffer()

int64_t JoinHashTable::getJoinHashBuffer ( const ExecutorDeviceType  device_type,
const int  device_id 
)
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 63 of file JoinHashTable.h.

References CHECK, CHECK_LT, codegenMatchingSet(), codegenSlot(), CPU, and cpu_hash_table_buff_.

64  {
65  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
66  return 0;
67  }
68 #ifdef HAVE_CUDA
69  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
70  if (device_type == ExecutorDeviceType::CPU) {
71  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
72  } else {
73  return gpu_hash_table_buff_[device_id]
74  ? reinterpret_cast<CUdeviceptr>(
75  gpu_hash_table_buff_[device_id]->getMemoryPtr())
76  : reinterpret_cast<CUdeviceptr>(nullptr);
77  }
78 #else
79  CHECK(device_type == ExecutorDeviceType::CPU);
80  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
81 #endif
82  }
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
#define CHECK_LT(x, y)
Definition: Logger.h:197
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ getOneColumnFragment()

std::pair< const int8_t *, size_t > JoinHashTable::getOneColumnFragment ( const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)
private

Definition at line 399 of file JoinHashTable.cpp.

References ColumnFetcher::getOneColumnFragment().

Referenced by JoinHashTable().

404  {
406  hash_col,
407  fragment,
408  effective_mem_lvl,
409  device_id,
410  chunks_owner,
411  column_cache_);
412 }
ColumnCacheMap & column_cache_
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Executor * executor_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initHashTableForDevice()

void JoinHashTable::initHashTableForDevice ( const ChunkKey chunk_key,
const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 905 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK, CHECK_EQ, CHECK_GT, copy_from_gpu(), copy_to_gpu(), Data_Namespace::CPU_LEVEL, fill_hash_join_buff_on_device_bucketized(), fill_hash_join_buff_on_device_sharded_bucketized(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), Data_Namespace::GPU_LEVEL, init_hash_join_buff_on_device(), and inline_fixed_encoding_null_val().

Referenced by JoinHashTable().

911  {
912  const auto inner_col = cols.first;
913  CHECK(inner_col);
914 
915  auto hash_entry_info = get_bucketized_hash_entry_info(
916  inner_col->get_type_info(), col_range_, isBitwiseEq());
917  if (!hash_entry_info) {
918  return;
919  }
920 
921 #ifdef HAVE_CUDA
922  const auto shard_count = shardCount();
923  const size_t entries_per_shard{
924  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
925  : 0};
926  // Even if we join on dictionary encoded strings, the memory on the GPU is still
927  // needed once the join hash table has been built on the CPU.
928  const auto catalog = executor_->getCatalog();
930  auto& data_mgr = catalog->getDataMgr();
931  if (shard_count) {
932  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
933  CHECK_GT(shards_per_device, 0u);
934  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
935  }
936  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
937  &data_mgr,
938  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
939  device_id);
940  }
941 #else
942  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
943 #endif
944 
945 #ifdef HAVE_CUDA
946  const auto& ti = inner_col->get_type_info();
947 #endif
948  const int32_t hash_join_invalid_val{-1};
949  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
950  CHECK(!chunk_key.empty());
951  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
952  {
953  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
955  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
956  }
957  if (inner_col->get_table_id() > 0) {
958  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
959  }
960  // Transfer the hash table on the GPU if we've only built it on CPU
961  // but the query runs on GPU (join on dictionary encoded columns).
963 #ifdef HAVE_CUDA
964  CHECK(ti.is_string());
965  auto& data_mgr = catalog->getDataMgr();
966  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
967 
968  copy_to_gpu(
969  &data_mgr,
970  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
971  &(*cpu_hash_table_buff_)[0],
972  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
973  device_id);
974 #else
975  CHECK(false);
976 #endif
977  }
978  } else {
979 #ifdef HAVE_CUDA
980  int err{0};
981  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
982  auto& data_mgr = catalog->getDataMgr();
983  gpu_hash_table_err_buff_[device_id] =
984  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
985  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
986  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
987  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
989  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
990  hash_entry_info.getNormalizedHashEntryCount(),
991  hash_join_invalid_val,
992  executor_->blockSize(),
993  executor_->gridSize());
994  if (chunk_key.empty()) {
995  return;
996  }
997  JoinColumn join_column{col_buff, num_elements};
998  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1002  isBitwiseEq(),
1003  col_range_.getIntMax() + 1,
1005  if (shard_count) {
1006  CHECK_GT(device_count_, 0);
1007  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1008  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1010  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1011  hash_join_invalid_val,
1012  reinterpret_cast<int*>(dev_err_buff),
1013  join_column,
1014  type_info,
1015  shard_info,
1016  executor_->blockSize(),
1017  executor_->gridSize(),
1018  hash_entry_info.bucket_normalization);
1019  }
1020  } else {
1022  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1023  hash_join_invalid_val,
1024  reinterpret_cast<int*>(dev_err_buff),
1025  join_column,
1026  type_info,
1027  executor_->blockSize(),
1028  executor_->gridSize(),
1029  hash_entry_info.bucket_normalization);
1030  }
1031  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1032 
1033  if (err) {
1034  throw NeedsOneToManyHash();
1035  }
1036 #else
1037  CHECK(false);
1038 #endif
1039  }
1040 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
const int device_count_
bool isBitwiseEq() const
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:199
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
int64_t getIntMin() const
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t shardCount() const
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
const Data_Namespace::MemoryLevel memory_level_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initHashTableOnCpu()

void JoinHashTable::initHashTableOnCpu ( const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 726 of file JoinHashTable.cpp.

References HashEntryInfo::bucket_normalization, CHECK, CHECK_EQ, cpu_threads(), fill_hash_join_buff_bucketized(), get_join_column_type_kind(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), and kENCODING_DICT.

Referenced by JoinHashTable().

731  {
732  const auto inner_col = cols.first;
733  CHECK(inner_col);
734  const auto& ti = inner_col->get_type_info();
735  if (!cpu_hash_table_buff_) {
736  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
737  hash_entry_info.getNormalizedHashEntryCount());
738  const StringDictionaryProxy* sd_inner_proxy{nullptr};
739  const StringDictionaryProxy* sd_outer_proxy{nullptr};
740  if (ti.is_string()) {
741  CHECK_EQ(kENCODING_DICT, ti.get_compression());
742  sd_inner_proxy = executor_->getStringDictionaryProxy(
743  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
744  CHECK(sd_inner_proxy);
745  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
746  CHECK(outer_col);
747  sd_outer_proxy = executor_->getStringDictionaryProxy(
748  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
749  CHECK(sd_outer_proxy);
750  }
751  int thread_count = cpu_threads();
752  std::vector<std::thread> init_cpu_buff_threads;
753  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
754  init_cpu_buff_threads.emplace_back(
755  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
757  hash_entry_info.getNormalizedHashEntryCount(),
758  hash_join_invalid_val,
759  thread_idx,
760  thread_count);
761  });
762  }
763  for (auto& t : init_cpu_buff_threads) {
764  t.join();
765  }
766  init_cpu_buff_threads.clear();
767  int err{0};
768  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
769  init_cpu_buff_threads.emplace_back([this,
770  hash_join_invalid_val,
771  col_buff,
772  num_elements,
773  sd_inner_proxy,
774  sd_outer_proxy,
775  thread_idx,
776  thread_count,
777  &ti,
778  &err,
779  hash_entry_info] {
780  int partial_err =
782  hash_join_invalid_val,
783  {col_buff, num_elements},
784  {static_cast<size_t>(ti.get_size()),
788  isBitwiseEq(),
789  col_range_.getIntMax() + 1,
791  sd_inner_proxy,
792  sd_outer_proxy,
793  thread_idx,
794  thread_count,
795  hash_entry_info.bucket_normalization);
796  __sync_val_compare_and_swap(&err, 0, partial_err);
797  });
798  }
799  for (auto& t : init_cpu_buff_threads) {
800  t.join();
801  }
802  if (err) {
803  cpu_hash_table_buff_.reset();
804  // Too many hash entries, need to retry with a 1:many table
805  throw NeedsOneToManyHash();
806  }
807  } else {
808  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
809  // Too many hash entries, need to retry with a 1:many table
810  throw NeedsOneToManyHash();
811  }
812  }
813 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
size_t getNormalizedHashEntryCount() const
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool isBitwiseEq() const
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
int64_t getIntMin() const
int64_t bucket_normalization
Executor * executor_
ExpressionRange col_range_
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initHashTableOnCpuFromCache()

void JoinHashTable::initHashTableOnCpuFromCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols 
)
private

Definition at line 1173 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1176  {
1177  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1178  JoinHashTableCacheKey cache_key{col_range_,
1179  *cols.first,
1180  outer_col ? *outer_col : *cols.first,
1181  num_elements,
1182  chunk_key,
1183  qual_bin_oper_->get_optype()};
1184  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1185  for (const auto& kv : join_hash_table_cache_) {
1186  if (kv.first == cache_key) {
1187  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1188  cpu_hash_table_buff_ = kv.second;
1189  break;
1190  }
1191  }
1192 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ initOneToManyHashTable()

void JoinHashTable::initOneToManyHashTable ( const ChunkKey chunk_key,
const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 1042 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK, CHECK_EQ, CHECK_GT, copy_to_gpu(), Data_Namespace::CPU_LEVEL, fill_one_to_many_hash_table_on_device(), fill_one_to_many_hash_table_on_device_bucketized(), fill_one_to_many_hash_table_on_device_sharded(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), get_shard_count(), Data_Namespace::GPU_LEVEL, init_hash_join_buff_on_device(), inline_fixed_encoding_null_val(), and kDATE.

Referenced by JoinHashTable().

1048  {
1049  auto const inner_col = cols.first;
1050  CHECK(inner_col);
1051 
1052  auto hash_entry_info = get_bucketized_hash_entry_info(
1053  inner_col->get_type_info(), col_range_, isBitwiseEq());
1054 
1055 #ifdef HAVE_CUDA
1056  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1057  const size_t entries_per_shard =
1058  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1059  : 0);
1060  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1061  // needed once the join hash table has been built on the CPU.
1062  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1063  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1064  CHECK_GT(shards_per_device, 0u);
1065  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1066  }
1067 #else
1068  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1069 #endif
1070  if (!device_id) {
1071  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1072  }
1073 
1074 #ifdef HAVE_CUDA
1075  const auto& ti = inner_col->get_type_info();
1076  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1078  const size_t total_count =
1079  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements;
1080  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1081  &data_mgr, total_count * sizeof(int32_t), device_id);
1082  }
1083 #endif
1084  const int32_t hash_join_invalid_val{-1};
1085  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1086  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1087  {
1088  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1090  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1091  }
1092  if (inner_col->get_table_id() > 0) {
1093  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1094  }
1095  // Transfer the hash table on the GPU if we've only built it on CPU
1096  // but the query runs on GPU (join on dictionary encoded columns).
1097  // Don't transfer the buffer if there was an error since we'll bail anyway.
1099 #ifdef HAVE_CUDA
1100  CHECK(ti.is_string());
1101  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1102  copy_to_gpu(
1103  &data_mgr,
1104  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1105  &(*cpu_hash_table_buff_)[0],
1106  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1107  device_id);
1108 #else
1109  CHECK(false);
1110 #endif
1111  }
1112  } else {
1113 #ifdef HAVE_CUDA
1114  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1115  data_mgr.getCudaMgr()->setContext(device_id);
1117  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1118  hash_entry_info.getNormalizedHashEntryCount(),
1119  hash_join_invalid_val,
1120  executor_->blockSize(),
1121  executor_->gridSize());
1122  JoinColumn join_column{col_buff, num_elements};
1123  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1127  isBitwiseEq(),
1128  col_range_.getIntMax() + 1,
1130  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1131 
1132  if (shard_count) {
1133  CHECK_GT(device_count_, 0);
1134  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1135  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1137  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1138  hash_entry_info,
1139  hash_join_invalid_val,
1140  join_column,
1141  type_info,
1142  shard_info,
1143  executor_->blockSize(),
1144  executor_->gridSize());
1145  }
1146  } else {
1147  if (use_bucketization) {
1149  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1150  hash_entry_info,
1151  hash_join_invalid_val,
1152  join_column,
1153  type_info,
1154  executor_->blockSize(),
1155  executor_->gridSize());
1156  } else {
1158  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1159  hash_entry_info,
1160  hash_join_invalid_val,
1161  join_column,
1162  type_info,
1163  executor_->blockSize(),
1164  executor_->gridSize());
1165  }
1166  }
1167 #else
1168  CHECK(false);
1169 #endif
1170  }
1171 }
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const int device_count_
bool isBitwiseEq() const
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:199
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
int64_t getIntMin() const
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:55
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initOneToManyHashTableOnCpu()

void JoinHashTable::initOneToManyHashTableOnCpu ( const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 815 of file JoinHashTable.cpp.

References CHECK, CHECK_EQ, CHECK_NE, cpu_threads(), fill_one_to_many_hash_table(), fill_one_to_many_hash_table_bucketized(), get_join_column_type_kind(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), kDATE, and kENCODING_DICT.

Referenced by JoinHashTable().

820  {
821  const auto inner_col = cols.first;
822  CHECK(inner_col);
823  const auto& ti = inner_col->get_type_info();
824  if (cpu_hash_table_buff_) {
825  return;
826  }
827  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
828  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements);
829  const StringDictionaryProxy* sd_inner_proxy{nullptr};
830  const StringDictionaryProxy* sd_outer_proxy{nullptr};
831  if (ti.is_string()) {
832  CHECK_EQ(kENCODING_DICT, ti.get_compression());
833  sd_inner_proxy = executor_->getStringDictionaryProxy(
834  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
835  CHECK(sd_inner_proxy);
836  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
837  CHECK(outer_col);
838  sd_outer_proxy = executor_->getStringDictionaryProxy(
839  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
840  CHECK(sd_outer_proxy);
841  }
842  int thread_count = cpu_threads();
843  std::vector<std::future<void>> init_threads;
844  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
845  init_threads.emplace_back(std::async(std::launch::async,
847  &(*cpu_hash_table_buff_)[0],
848  hash_entry_info.getNormalizedHashEntryCount(),
849  hash_join_invalid_val,
850  thread_idx,
851  thread_count));
852  }
853  for (auto& child : init_threads) {
854  child.wait();
855  }
856  for (auto& child : init_threads) {
857  child.get();
858  }
859 
860  if (ti.get_type() == kDATE) {
862  hash_entry_info,
863  hash_join_invalid_val,
864  {col_buff, num_elements},
865  {static_cast<size_t>(ti.get_size()),
869  isBitwiseEq(),
870  col_range_.getIntMax() + 1,
872  sd_inner_proxy,
873  sd_outer_proxy,
874  thread_count);
875  } else {
877  hash_entry_info,
878  hash_join_invalid_val,
879  {col_buff, num_elements},
880  {static_cast<size_t>(ti.get_size()),
884  isBitwiseEq(),
885  col_range_.getIntMax() + 1,
887  sd_inner_proxy,
888  sd_outer_proxy,
889  thread_count);
890  }
891 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
size_t getNormalizedHashEntryCount() const
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool isBitwiseEq() const
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
int64_t getIntMax() const
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
int64_t getIntMin() const
Definition: sqltypes.h:55
Executor * executor_
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
ExpressionRange col_range_
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ isBitwiseEq()

bool JoinHashTable::isBitwiseEq ( ) const
private

Definition at line 1466 of file JoinHashTable.cpp.

References kBW_EQ.

Referenced by JoinHashTable().

1466  {
1467  return qual_bin_oper_->get_optype() == kBW_EQ;
1468 }
Definition: sqldefs.h:31
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ offsetBufferOff()

size_t JoinHashTable::offsetBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1370 of file JoinHashTable.cpp.

References CHECK, and JoinHashTableInterface::OneToMany.

Referenced by getHashType().

1370  {
1372  return 0;
1373 }
HashType hash_type_
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ payloadBufferOff()

size_t JoinHashTable::payloadBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1380 of file JoinHashTable.cpp.

References CHECK, and JoinHashTableInterface::OneToMany.

Referenced by getHashType().

1380  {
1382  return 2 * getComponentBufferSize();
1383 }
HashType hash_type_
size_t getComponentBufferSize() const noexcept
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ putHashTableOnCpuToCache()

void JoinHashTable::putHashTableOnCpuToCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &  cols 
)
private

Definition at line 1194 of file JoinHashTable.cpp.

Referenced by JoinHashTable().

1197  {
1198  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1199  JoinHashTableCacheKey cache_key{col_range_,
1200  *cols.first,
1201  outer_col ? *outer_col : *cols.first,
1202  num_elements,
1203  chunk_key,
1204  qual_bin_oper_->get_optype()};
1205  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1206  for (const auto& kv : join_hash_table_cache_) {
1207  if (kv.first == cache_key) {
1208  return;
1209  }
1210  }
1211  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1212 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
+ Here is the caller graph for this function:

◆ reify()

void JoinHashTable::reify ( const int  device_count)
private

Definition at line 480 of file JoinHashTable.cpp.

References CHECK_LT, anonymous_namespace{JoinHashTable.cpp}::get_cols(), JoinHashTableInterface::OneToMany, JoinHashTableInterface::OneToOne, only_shards_for_device(), reifyOneToManyForDevice(), and reifyOneToOneForDevice().

Referenced by JoinHashTable().

480  {
481  CHECK_LT(0, device_count);
482  const auto& catalog = *executor_->getCatalog();
483  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
484  const auto inner_col = cols.first;
485  checkHashJoinReplicationConstraint(inner_col->get_table_id());
486  const auto& query_info = getInnerQueryInfo(inner_col).info;
487  if (query_info.fragments.empty()) {
488  return;
489  }
490  if (query_info.getNumTuplesUpperBound() >
491  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
492  throw TooManyHashEntries();
493  }
494 #ifdef HAVE_CUDA
495  gpu_hash_table_buff_.resize(device_count);
496  gpu_hash_table_err_buff_.resize(device_count);
497 #endif // HAVE_CUDA
498  std::vector<std::future<void>> init_threads;
499  const int shard_count = shardCount();
500 
501  try {
502  for (int device_id = 0; device_id < device_count; ++device_id) {
503  const auto fragments =
504  shard_count
505  ? only_shards_for_device(query_info.fragments, device_id, device_count)
506  : query_info.fragments;
507  init_threads.push_back(
508  std::async(std::launch::async,
512  this,
513  fragments,
514  device_id));
515  }
516  for (auto& init_thread : init_threads) {
517  init_thread.wait();
518  }
519  for (auto& init_thread : init_threads) {
520  init_thread.get();
521  }
522 
523  } catch (const NeedsOneToManyHash& e) {
526  init_threads.clear();
527  for (int device_id = 0; device_id < device_count; ++device_id) {
528  const auto fragments =
529  shard_count
530  ? only_shards_for_device(query_info.fragments, device_id, device_count)
531  : query_info.fragments;
532 
533  init_threads.push_back(std::async(std::launch::async,
535  this,
536  fragments,
537  device_id));
538  }
539  for (auto& init_thread : init_threads) {
540  init_thread.wait();
541  }
542  for (auto& init_thread : init_threads) {
543  init_thread.get();
544  }
545  }
546 }
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
void freeHashBufferMemory()
void checkHashJoinReplicationConstraint(const int table_id) const
HashType hash_type_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
size_t shardCount() const
#define CHECK_LT(x, y)
Definition: Logger.h:197
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reifyOneToManyForDevice()

void JoinHashTable::reifyOneToManyForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
private

Definition at line 664 of file JoinHashTable.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), and needs_dictionary_translation().

Referenced by JoinHashTable(), and reify().

666  {
667  const auto& catalog = *executor_->getCatalog();
668  auto& data_mgr = catalog.getDataMgr();
669  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
670  const auto inner_col = cols.first;
671  CHECK(inner_col);
672  const auto inner_cd = get_column_descriptor_maybe(
673  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
674  if (inner_cd && inner_cd->isVirtualCol) {
676  }
677  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
678  // Since we don't have the string dictionary payloads on the GPU, we'll build
679  // the join hash table on the CPU and transfer it to the GPU.
680  const auto effective_memory_level =
681  needs_dictionary_translation(inner_col, cols.second, executor_)
683  : memory_level_;
684  if (fragments.empty()) {
685  ChunkKey empty_chunk;
687  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
688  return;
689  }
690 
691  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
692  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
693  const int8_t* col_buff = nullptr;
694  size_t elem_count = 0;
695 
696  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
697  fragments,
698  effective_memory_level,
699  device_id,
700  chunks_owner,
701  dev_buff_owner);
702 
703  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
704  col_buff,
705  elem_count,
706  cols,
707  effective_memory_level,
708  device_id);
709 }
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
Executor * executor_
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ reifyOneToOneForDevice()

void JoinHashTable::reifyOneToOneForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
private

Definition at line 616 of file JoinHashTable.cpp.

References CHECK, Data_Namespace::CPU_LEVEL, anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), and needs_dictionary_translation().

Referenced by JoinHashTable(), and reify().

618  {
619  const auto& catalog = *executor_->getCatalog();
620  auto& data_mgr = catalog.getDataMgr();
621  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
622  const auto inner_col = cols.first;
623  CHECK(inner_col);
624  const auto inner_cd = get_column_descriptor_maybe(
625  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
626  if (inner_cd && inner_cd->isVirtualCol) {
628  }
629  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
630  // Since we don't have the string dictionary payloads on the GPU, we'll build
631  // the join hash table on the CPU and transfer it to the GPU.
632  const auto effective_memory_level =
633  needs_dictionary_translation(inner_col, cols.second, executor_)
635  : memory_level_;
636  if (fragments.empty()) {
637  // No data in this fragment. Still need to create a hash table and initialize it
638  // properly.
639  ChunkKey empty_chunk;
641  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
642  }
643 
644  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
645  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
646  const int8_t* col_buff = nullptr;
647  size_t elem_count = 0;
648 
649  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
650  fragments,
651  effective_memory_level,
652  device_id,
653  chunks_owner,
654  dev_buff_owner);
655 
656  initHashTableForDevice(genHashTableKey(fragments, cols.second, inner_col),
657  col_buff,
658  elem_count,
659  cols,
660  effective_memory_level,
661  device_id);
662 }
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
Executor * executor_
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ shardCount()

size_t JoinHashTable::shardCount ( ) const
private

Definition at line 1460 of file JoinHashTable.cpp.

References get_shard_count(), and Data_Namespace::GPU_LEVEL.

Referenced by JoinHashTable().

1460  {
1463  : 0;
1464 }
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ yieldCacheInvalidator()

static auto JoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 116 of file JoinHashTable.h.

References join_hash_table_cache_, and join_hash_table_cache_mutex_.

116  {
117  return []() -> void {
118  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
119  join_hash_table_cache_.clear();
120  };
121  }
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_

Member Data Documentation

◆ col_range_

ExpressionRange JoinHashTable::col_range_
private

Definition at line 247 of file JoinHashTable.h.

◆ col_var_

std::shared_ptr<Analyzer::ColumnVar> JoinHashTable::col_var_
private

Definition at line 236 of file JoinHashTable.h.

Referenced by getInnerTableId(), and getInnerTableRteIdx().

◆ column_cache_

ColumnCacheMap& JoinHashTable::column_cache_
private

Definition at line 249 of file JoinHashTable.h.

◆ cpu_hash_table_buff_

std::shared_ptr<std::vector<int32_t> > JoinHashTable::cpu_hash_table_buff_
private

Definition at line 241 of file JoinHashTable.h.

Referenced by getJoinHashBuffer().

◆ cpu_hash_table_buff_mutex_

std::mutex JoinHashTable::cpu_hash_table_buff_mutex_
private

Definition at line 242 of file JoinHashTable.h.

◆ device_count_

const int JoinHashTable::device_count_
private

Definition at line 250 of file JoinHashTable.h.

◆ executor_

Executor* JoinHashTable::executor_
private

Definition at line 248 of file JoinHashTable.h.

◆ hash_entry_count_

size_t JoinHashTable::hash_entry_count_
private

Definition at line 240 of file JoinHashTable.h.

◆ hash_type_

HashType JoinHashTable::hash_type_
private

Definition at line 239 of file JoinHashTable.h.

Referenced by getHashType().

◆ join_hash_table_cache_

std::vector< std::pair< JoinHashTable::JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > JoinHashTable::join_hash_table_cache_
staticprivate

◆ join_hash_table_cache_mutex_

std::mutex JoinHashTable::join_hash_table_cache_mutex_
staticprivate

◆ linearized_multifrag_column_

std::pair<const int8_t*, size_t> JoinHashTable::linearized_multifrag_column_
private

Definition at line 251 of file JoinHashTable.h.

◆ linearized_multifrag_column_mutex_

std::mutex JoinHashTable::linearized_multifrag_column_mutex_
private

Definition at line 252 of file JoinHashTable.h.

◆ linearized_multifrag_column_owner_

RowSetMemoryOwner JoinHashTable::linearized_multifrag_column_owner_
private

Definition at line 253 of file JoinHashTable.h.

◆ memory_level_

const Data_Namespace::MemoryLevel JoinHashTable::memory_level_
private

Definition at line 238 of file JoinHashTable.h.

◆ qual_bin_oper_

std::shared_ptr<Analyzer::BinOper> JoinHashTable::qual_bin_oper_
private

Definition at line 235 of file JoinHashTable.h.

◆ query_infos_

const std::vector<InputTableInfo>& JoinHashTable::query_infos_
private

Definition at line 237 of file JoinHashTable.h.


The documentation for this class was generated from the following files: