OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable Class Reference

#include <JoinHashTable.h>

+ Inheritance diagram for JoinHashTable:
+ Collaboration diagram for JoinHashTable:

Classes

struct  JoinHashTableCacheKey
 

Public Member Functions

int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
size_t getJoinHashBufferSize (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
std::string toString (const ExecutorDeviceType device_type, const int device_id, bool raw=false) const noexceptoverride
 
std::set
< DecodedJoinHashBufferEntry
decodeJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexceptoverride
 
int getInnerTableRteIdx () const noexceptoverride
 
HashType getHashType () const noexceptoverride
 
size_t offsetBufferOff () const noexceptoverride
 
size_t countBufferOff () const noexceptoverride
 
size_t payloadBufferOff () const noexceptoverride
 
virtual ~JoinHashTable ()
 
- Public Member Functions inherited from JoinHashTableInterface
virtual std::string toStringFlat64 (const ExecutorDeviceType device_type, const int device_id) const noexcept
 
virtual std::string toStringFlat32 (const ExecutorDeviceType device_type, const int device_id) const noexcept
 

Static Public Member Functions

static std::shared_ptr
< JoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr
< JoinHashTable
getSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static HashJoinMatchingSet codegenMatchingSet (const std::vector< llvm::Value * > &hash_join_idx_args_in, const bool is_sharded, const bool col_is_nullable, const bool is_bw_eq, const int64_t sub_buff_size, Executor *executor, const bool is_bucketized=false)
 
static llvm::Value * codegenHashTableLoad (const size_t table_idx, Executor *executor)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 

Private Member Functions

 JoinHashTable (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
 
std::pair< const int8_t *, size_t > getOneColumnFragment (const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
std::pair< const int8_t *, size_t > getAllColumnFragments (const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
ChunkKey genHashTableKey (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
 
void reify (const int device_count)
 
void reifyOneToOneForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
 
void reifyOneToManyForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
void initHashTableForDevice (const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initOneToManyHashTable (const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
void initHashTableOnCpuFromCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
 
void putHashTableOnCpuToCache (const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
 
void initHashTableOnCpu (const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
void initOneToManyHashTableOnCpu (const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
 
const InputTableInfogetInnerQueryInfo (const Analyzer::ColumnVar *inner_col) const
 
size_t shardCount () const
 
llvm::Value * codegenHashTableLoad (const size_t table_idx)
 
std::vector< llvm::Value * > getHashJoinArgs (llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
 
std::pair< const int8_t *, size_t > fetchFragments (const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
size_t getComponentBufferSize () const noexcept
 

Private Attributes

std::shared_ptr
< Analyzer::BinOper
qual_bin_oper_
 
std::shared_ptr
< Analyzer::ColumnVar
col_var_
 
const std::vector
< InputTableInfo > & 
query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
HashType hash_type_
 
size_t hash_entry_count_
 
std::shared_ptr< std::vector
< int32_t > > 
cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
ExpressionRange col_range_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
const int device_count_
 
std::pair< const int8_t *, size_t > linearized_multifrag_column_
 
std::mutex linearized_multifrag_column_mutex_
 
RowSetMemoryOwner linearized_multifrag_column_owner_
 

Static Private Attributes

static std::vector< std::pair
< JoinHashTableCacheKey,
std::shared_ptr< std::vector
< int32_t > > > > 
join_hash_table_cache_
 
static std::mutex join_hash_table_cache_mutex_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType { HashType::OneToOne, HashType::OneToMany }
 

Detailed Description

Definition at line 52 of file JoinHashTable.h.

Constructor & Destructor Documentation

virtual JoinHashTable::~JoinHashTable ( )
inlinevirtual

Definition at line 129 of file JoinHashTable.h.

129 {}
JoinHashTable::JoinHashTable ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const Analyzer::ColumnVar col_var,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const ExpressionRange col_range,
ColumnCacheMap column_cache,
Executor executor,
const int  device_count 
)
inlineprivate

Definition at line 132 of file JoinHashTable.h.

References CHECK(), ExpressionRange::getType(), and Integer.

Referenced by getInstance().

141  : qual_bin_oper_(qual_bin_oper)
142  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
143  , query_infos_(query_infos)
144  , memory_level_(memory_level)
145  , hash_type_(preferred_hash_type)
146  , hash_entry_count_(0)
147  , col_range_(col_range)
148  , executor_(executor)
149  , column_cache_(column_cache)
150  , device_count_(device_count) {
152  }
const std::vector< InputTableInfo > & query_infos_
const int device_count_
ColumnCacheMap & column_cache_
std::shared_ptr< Analyzer::Expr > deep_copy() const override
Definition: Analyzer.cpp:59
HashType hash_type_
CHECK(cgen_state)
Executor * executor_
ExpressionRange col_range_
ExpressionRangeType getType() const
std::shared_ptr< Analyzer::ColumnVar > col_var_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Function Documentation

void JoinHashTable::checkHashJoinReplicationConstraint ( const int  table_id) const
private

Definition at line 776 of file JoinHashTable.cpp.

References CHECK(), executor_, g_cluster, get_shard_count(), qual_bin_oper_, and table_is_replicated().

Referenced by reify().

776  {
777  if (!g_cluster) {
778  return;
779  }
780  if (table_id >= 0) {
781  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
782  CHECK(inner_td);
783  size_t shard_count{0};
784  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
785  if (!shard_count && !table_is_replicated(inner_td)) {
786  throw TableMustBeReplicated(inner_td->tableName);
787  }
788  }
789 }
bool g_cluster
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
CHECK(cgen_state)
Executor * executor_
bool table_is_replicated(const TableDescriptor *td)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx,
Executor executor 
)
static

Definition at line 1290 of file JoinHashTable.cpp.

References CHECK(), CHECK_LT, and get_arg_by_name().

Referenced by codegenHashTableLoad(), BaselineJoinHashTable::codegenMatchingSet(), codegenMatchingSet(), codegenSlot(), and BaselineJoinHashTable::hashPtr().

1291  {
1292  llvm::Value* hash_ptr = nullptr;
1293  const auto total_table_count =
1294  executor->plan_state_->join_info_.join_hash_tables_.size();
1295  CHECK_LT(table_idx, total_table_count);
1296  if (total_table_count > 1) {
1297  auto hash_tables_ptr =
1298  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1299  auto hash_pptr =
1300  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1301  hash_tables_ptr,
1302  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1303  : hash_tables_ptr;
1304  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1305  } else {
1306  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1307  }
1308  CHECK(hash_ptr);
1309  return hash_ptr;
1310 }
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:117
#define CHECK_LT(x, y)
Definition: Logger.h:200

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

llvm::Value * JoinHashTable::codegenHashTableLoad ( const size_t  table_idx)
private

Definition at line 1279 of file JoinHashTable.cpp.

References CHECK(), codegenHashTableLoad(), executor_, and get_arg_by_name().

1279  {
1280  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1281  if (hash_ptr->getType()->isIntegerTy(64)) {
1282  return hash_ptr;
1283  }
1284  CHECK(hash_ptr->getType()->isPointerTy());
1285  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1286  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1287  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1288 }
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:117
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)

+ Here is the call graph for this function:

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1362 of file JoinHashTable.cpp.

References CHECK(), codegenHashTableLoad(), executor_, anonymous_namespace{JoinHashTable.cpp}::get_cols(), getComponentBufferSize(), getHashJoinArgs(), isBitwiseEq(), kDATE, qual_bin_oper_, and shardCount().

Referenced by BaselineJoinHashTable::codegenMatchingSet().

1363  {
1364  const auto cols = get_cols(
1365  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1366  auto key_col = cols.second;
1367  CHECK(key_col);
1368  auto val_col = cols.first;
1369  CHECK(val_col);
1370  auto pos_ptr = codegenHashTableLoad(index);
1371  CHECK(pos_ptr);
1372  const int shard_count = shardCount();
1373  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1374  const int64_t sub_buff_size = getComponentBufferSize();
1375  const auto& key_col_ti = key_col->get_type_info();
1376 
1377  auto bucketize = (key_col_ti.get_type() == kDATE);
1378  return codegenMatchingSet(hash_join_idx_args,
1379  shard_count,
1380  !key_col_ti.get_notnull(),
1381  isBitwiseEq(),
1382  sub_buff_size,
1383  executor_,
1384  bucketize);
1385 }
bool isBitwiseEq() const
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
CHECK(cgen_state)
size_t shardCount() const
Definition: sqltypes.h:56
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
size_t getComponentBufferSize() const noexcept
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

HashJoinMatchingSet JoinHashTable::codegenMatchingSet ( const std::vector< llvm::Value * > &  hash_join_idx_args_in,
const bool  is_sharded,
const bool  col_is_nullable,
const bool  is_bw_eq,
const int64_t  sub_buff_size,
Executor executor,
const bool  is_bucketized = false 
)
static

Definition at line 1387 of file JoinHashTable.cpp.

References CHECK().

1394  {
1395  using namespace std::string_literals;
1396 
1397  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1398 
1399  if (is_bw_eq) {
1400  fname += "_bitwise";
1401  }
1402  if (is_sharded) {
1403  fname += "_sharded";
1404  }
1405  if (!is_bw_eq && col_is_nullable) {
1406  fname += "_nullable";
1407  }
1408 
1409  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1410  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1411  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1412 
1413  auto pos_ptr = hash_join_idx_args_in[0];
1414  CHECK(pos_ptr);
1415 
1416  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1417  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1418  auto hash_join_idx_args = hash_join_idx_args_in;
1419  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1420  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1421 
1422  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1423  slot_valid_lv,
1424  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1425  executor->cgen_state_->llInt(int64_t(0)));
1426  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1427  executor->cgen_state_->ir_builder_.CreateAdd(
1428  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1429  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1430  auto rowid_ptr_i32 =
1431  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1432  return {rowid_ptr_i32, row_count_lv, slot_lv};
1433 }
CHECK(cgen_state)

+ Here is the call graph for this function:

llvm::Value * JoinHashTable::codegenSlot ( const CompilationOptions co,
const size_t  index 
)
overridevirtual

Implements JoinHashTableInterface.

Definition at line 1550 of file JoinHashTable.cpp.

References CHECK(), CHECK_EQ, CodeGenerator::codegen(), codegenHashTableLoad(), executor_, anonymous_namespace{JoinHashTable.cpp}::get_cols(), Analyzer::Expr::get_type_info(), getHashJoinArgs(), getHashType(), isBitwiseEq(), kDATE, JoinHashTableInterface::OneToOne, qual_bin_oper_, and shardCount().

1551  {
1552  using namespace std::string_literals;
1553 
1555  const auto cols = get_cols(
1556  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1557  auto key_col = cols.second;
1558  CHECK(key_col);
1559  auto val_col = cols.first;
1560  CHECK(val_col);
1561  CodeGenerator code_generator(executor_);
1562  const auto key_lvs = code_generator.codegen(key_col, true, co);
1563  CHECK_EQ(size_t(1), key_lvs.size());
1564  auto hash_ptr = codegenHashTableLoad(index);
1565  CHECK(hash_ptr);
1566  const int shard_count = shardCount();
1567  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1568 
1569  const auto& key_col_ti = key_col->get_type_info();
1570  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1571  : "hash_join_idx"s);
1572 
1573  if (isBitwiseEq()) {
1574  fname += "_bitwise";
1575  }
1576  if (shard_count) {
1577  fname += "_sharded";
1578  }
1579 
1580  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1581  fname += "_nullable";
1582  }
1583  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1584 }
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
CHECK(cgen_state)
size_t shardCount() const
Definition: sqltypes.h:56
Executor * executor_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
HashType getHashType() const noexceptoverride

+ Here is the call graph for this function:

size_t JoinHashTable::countBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1440 of file JoinHashTable.cpp.

References CHECK(), getComponentBufferSize(), hash_type_, and JoinHashTableInterface::OneToMany.

Referenced by decodeJoinHashBuffer(), and toString().

1440  {
1442  return getComponentBufferSize();
1443 }
HashType hash_type_
CHECK(cgen_state)
size_t getComponentBufferSize() const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::set< DecodedJoinHashBufferEntry > JoinHashTable::decodeJoinHashBuffer ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1524 of file JoinHashTable.cpp.

References copy_from_gpu(), countBufferOff(), decodeJoinHashBuffer(), executor_, getJoinHashBuffer(), getJoinHashBufferSize(), GPU, offsetBufferOff(), and payloadBufferOff().

1526  {
1527  auto buffer = getJoinHashBuffer(device_type, device_id);
1528  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1529 #ifdef HAVE_CUDA
1530  std::unique_ptr<int8_t[]> buffer_copy;
1531  if (device_type == ExecutorDeviceType::GPU) {
1532  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1533 
1534  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1535  buffer_copy.get(),
1536  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1537  buffer_size,
1538  device_id);
1539  }
1540  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1541 #else
1542  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1543 #endif // HAVE_CUDA
1544  auto ptr2 = ptr1 + offsetBufferOff();
1545  auto ptr3 = ptr1 + countBufferOff();
1546  auto ptr4 = ptr1 + payloadBufferOff();
1547  return ::decodeJoinHashBuffer(1, sizeof(int32_t), ptr1, ptr2, ptr3, ptr4, buffer_size);
1548 }
size_t payloadBufferOff() const noexceptoverride
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
size_t offsetBufferOff() const noexceptoverride
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::set< DecodedJoinHashBufferEntry > decodeJoinHashBuffer(size_t key_component_count, size_t key_component_width, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Executor * executor_
size_t countBufferOff() const noexceptoverride

+ Here is the call graph for this function:

std::pair< const int8_t *, size_t > JoinHashTable::fetchFragments ( const Analyzer::ColumnVar hash_col,
const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragment_info,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner,
ThrustAllocator dev_buff_owner 
)
private

Definition at line 613 of file JoinHashTable.cpp.

References ThrustAllocator::allocate(), CHECK_NE, copy_to_gpu(), executor_, SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), Analyzer::Expr::get_type_info(), getAllColumnFragments(), getOneColumnFragment(), and Data_Namespace::GPU_LEVEL.

Referenced by reifyOneToManyForDevice(), and reifyOneToOneForDevice().

619  {
620  static std::mutex fragment_fetch_mutex;
621  const bool has_multi_frag = fragment_info.size() > 1;
622  const auto& catalog = *executor_->getCatalog();
623  auto& data_mgr = catalog.getDataMgr();
624  const auto& first_frag = fragment_info.front();
625  const int8_t* col_buff = nullptr;
626  size_t elem_count = 0;
627 
628  const size_t elem_width = hash_col->get_type_info().get_size();
629  if (has_multi_frag) {
630  std::tie(col_buff, elem_count) =
631  getAllColumnFragments(*hash_col, fragment_info, chunks_owner);
632  }
633 
634  {
635  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
636  if (has_multi_frag) {
637  if (effective_memory_level == Data_Namespace::GPU_LEVEL && col_buff) {
638  CHECK_NE(elem_count, size_t(0));
639  int8_t* dev_col_buff = nullptr;
640  dev_col_buff = dev_buff_owner.allocate(elem_count * elem_width);
641  copy_to_gpu(&data_mgr,
642  reinterpret_cast<CUdeviceptr>(dev_col_buff),
643  col_buff,
644  elem_count * elem_width,
645  device_id);
646  col_buff = dev_col_buff;
647  }
648  } else {
649  std::tie(col_buff, elem_count) = getOneColumnFragment(
650  *hash_col, first_frag, effective_memory_level, device_id, chunks_owner);
651  }
652  }
653  return {col_buff, elem_count};
654 }
int8_t * allocate(std::ptrdiff_t num_bytes)
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
#define CHECK_NE(x, y)
Definition: Logger.h:199
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
Executor * executor_
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::freeHashBufferCpuMemory ( )
private

Definition at line 1659 of file JoinHashTable.cpp.

References cpu_hash_table_buff_.

Referenced by freeHashBufferMemory().

1659  {
1660  cpu_hash_table_buff_.reset();
1661 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_

+ Here is the caller graph for this function:

void JoinHashTable::freeHashBufferGpuMemory ( )
private

Definition at line 1638 of file JoinHashTable.cpp.

References CHECK(), executor_, and CudaAllocator::freeGpuAbstractBuffer().

Referenced by freeHashBufferMemory().

1638  {
1639 #ifdef HAVE_CUDA
1640  const auto& catalog = *executor_->getCatalog();
1641  auto& data_mgr = catalog.getDataMgr();
1642  for (auto& buf : gpu_hash_table_buff_) {
1643  if (buf) {
1644  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1645  buf = nullptr;
1646  }
1647  }
1648  for (auto& buf : gpu_hash_table_err_buff_) {
1649  if (buf) {
1650  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1651  buf = nullptr;
1652  }
1653  }
1654 #else
1655  CHECK(false);
1656 #endif // HAVE_CUDA
1657 }
CHECK(cgen_state)
Executor * executor_
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::freeHashBufferMemory ( )
private

Definition at line 1631 of file JoinHashTable.cpp.

References freeHashBufferCpuMemory(), and freeHashBufferGpuMemory().

Referenced by reify().

1631  {
1632 #ifdef HAVE_CUDA
1634 #endif
1636 }
void freeHashBufferGpuMemory()
void freeHashBufferCpuMemory()

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkKey JoinHashTable::genHashTableKey ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const Analyzer::Expr outer_col,
const Analyzer::ColumnVar inner_col 
) const
private

Definition at line 656 of file JoinHashTable.cpp.

References CHECK(), CHECK_EQ, executor_, Analyzer::ColumnVar::get_column_id(), Analyzer::ColumnVar::get_table_id(), Analyzer::Expr::get_type_info(), getInnerQueryInfo(), Fragmenter_Namespace::TableInfo::getNumTuples(), InputTableInfo::info, and kENCODING_DICT.

Referenced by reifyOneToManyForDevice(), and reifyOneToOneForDevice().

659  {
660  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
661  inner_col->get_table_id(),
662  inner_col->get_column_id()};
663  const auto& ti = inner_col->get_type_info();
664  if (ti.is_string()) {
665  CHECK_EQ(kENCODING_DICT, ti.get_compression());
666  size_t outer_elem_count = 0;
667  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
668  CHECK(outer_col);
669  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
670  for (auto& frag : outer_query_info.fragments) {
671  outer_elem_count = frag.getNumTuples();
672  }
673  hash_table_key.push_back(outer_elem_count);
674  }
675  if (fragments.size() < 2) {
676  hash_table_key.push_back(fragments.front().fragmentId);
677  }
678  return hash_table_key;
679 }
int get_table_id() const
Definition: Analyzer.h:194
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< int > ChunkKey
Definition: types.h:35
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
CHECK(cgen_state)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
Executor * executor_
int get_column_id() const
Definition: Analyzer.h:195
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > JoinHashTable::getAllColumnFragments ( const Analyzer::ColumnVar hash_col,
const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)
private

Definition at line 479 of file JoinHashTable.cpp.

References RowSetMemoryOwner::addColBuffer(), column_cache_, executor_, ColumnFetcher::getAllColumnFragments(), linearized_multifrag_column_, linearized_multifrag_column_mutex_, linearized_multifrag_column_owner_, and shardCount().

Referenced by fetchFragments().

482  {
483  std::lock_guard<std::mutex> linearized_multifrag_column_lock(
485  if (linearized_multifrag_column_.first) {
487  }
488  const int8_t* col_buff;
489  size_t total_elem_count;
490  std::tie(col_buff, total_elem_count) = ColumnFetcher::getAllColumnFragments(
491  executor_, hash_col, fragments, chunks_owner, column_cache_);
493  if (!shardCount()) {
494  linearized_multifrag_column_ = {col_buff, total_elem_count};
495  }
496  return {col_buff, total_elem_count};
497 }
std::pair< const int8_t *, size_t > linearized_multifrag_column_
ColumnCacheMap & column_cache_
std::mutex linearized_multifrag_column_mutex_
size_t shardCount() const
Executor * executor_
RowSetMemoryOwner linearized_multifrag_column_owner_
static std::pair< const int8_t *, size_t > getAllColumnFragments(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
void addColBuffer(const void *col_buffer)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t JoinHashTable::getComponentBufferSize ( ) const
privatenoexcept

Definition at line 1450 of file JoinHashTable.cpp.

References hash_entry_count_.

Referenced by codegenMatchingSet(), countBufferOff(), and payloadBufferOff().

1450  {
1451  return hash_entry_count_ * sizeof(int32_t);
1452 }
size_t hash_entry_count_

+ Here is the caller graph for this function:

std::vector< llvm::Value * > JoinHashTable::getHashJoinArgs ( llvm::Value *  hash_ptr,
const Analyzer::Expr key_col,
const int  shard_count,
const CompilationOptions co 
)
private

Definition at line 1312 of file JoinHashTable.cpp.

References CHECK_EQ, CodeGenerator::codegen(), col_range_, device_count_, executor_, anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_hash_entry_count(), get_logical_type_info(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_type(), Analyzer::Expr::get_type_info(), ExpressionRange::getIntMax(), ExpressionRange::getIntMin(), inline_fixed_encoding_null_val(), isBitwiseEq(), and kDATE.

Referenced by codegenMatchingSet(), and codegenSlot().

1315  {
1316  CodeGenerator code_generator(executor_);
1317  const auto key_lvs = code_generator.codegen(key_col, true, co);
1318  CHECK_EQ(size_t(1), key_lvs.size());
1319  auto const& key_col_ti = key_col->get_type_info();
1320  auto hash_entry_info =
1322 
1323  std::vector<llvm::Value*> hash_join_idx_args{
1324  hash_ptr,
1325  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1326  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1327  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1328  if (shard_count) {
1329  const auto expected_hash_entry_count =
1331  const auto entry_count_per_shard =
1332  (expected_hash_entry_count + shard_count - 1) / shard_count;
1333  hash_join_idx_args.push_back(
1334  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1335  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1336  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1337  }
1338  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1339  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1340  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1341  inline_fixed_encoding_null_val(key_col_logical_ti)));
1342  }
1343  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1344  if (isBitwiseEq()) {
1345  if (special_date_bucketization_case) {
1346  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1347  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1348  } else {
1349  hash_join_idx_args.push_back(
1350  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1351  }
1352  }
1353 
1354  if (special_date_bucketization_case) {
1355  hash_join_idx_args.emplace_back(
1356  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1357  }
1358 
1359  return hash_join_idx_args;
1360 }
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const int device_count_
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:56
Executor * executor_
ExpressionRange col_range_
int64_t getIntMax() const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

HashType JoinHashTable::getHashType ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 103 of file JoinHashTable.h.

References hash_type_.

Referenced by codegenSlot().

103 { return hash_type_; }
HashType hash_type_

+ Here is the caller graph for this function:

const InputTableInfo & JoinHashTable::getInnerQueryInfo ( const Analyzer::ColumnVar inner_col) const
private

Definition at line 1586 of file JoinHashTable.cpp.

References get_inner_query_info(), Analyzer::ColumnVar::get_table_id(), and query_infos_.

Referenced by genHashTableKey(), and reify().

1587  {
1588  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1589 }
int get_table_id() const
Definition: Analyzer.h:194
const std::vector< InputTableInfo > & query_infos_
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int JoinHashTable::getInnerTableId ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 95 of file JoinHashTable.h.

References col_var_.

95  {
96  return col_var_.get()->get_table_id();
97  };
std::shared_ptr< Analyzer::ColumnVar > col_var_
int JoinHashTable::getInnerTableRteIdx ( ) const
inlineoverridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 99 of file JoinHashTable.h.

References col_var_.

99  {
100  return col_var_.get()->get_rte_idx();
101  };
std::shared_ptr< Analyzer::ColumnVar > col_var_
std::shared_ptr< JoinHashTable > JoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper qual_bin_oper,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from an in-flight SQL query's parse tree etc.

Definition at line 304 of file JoinHashTable.cpp.

References CHECK(), CHECK_EQ, anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), getExpressionRange(), HashEntryInfo::getNormalizedHashEntryCount(), Data_Namespace::GPU_LEVEL, Invalid, IS_EQUIVALENCE, JoinHashTable(), kBW_EQ, and ExpressionRange::makeIntRange().

Referenced by Executor::buildHashTableForQualifier(), and getSyntheticInstance().

311  {
312  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
313  const auto cols =
314  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
315  const auto inner_col = cols.first;
316  CHECK(inner_col);
317  const auto& ti = inner_col->get_type_info();
318  auto col_range =
319  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
320  if (col_range.getType() == ExpressionRangeType::Invalid) {
321  throw HashJoinFail(
322  "Could not compute range for the expressions involved in the equijoin");
323  }
324  if (ti.is_string()) {
325  // The nullable info must be the same as the source column.
326  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
327  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
328  throw HashJoinFail(
329  "Could not compute range for the expressions involved in the equijoin");
330  }
331  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
332  // If the inner column expression range is empty, use the inner col range
333  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
334  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
335  col_range = source_col_range;
336  } else {
337  col_range = ExpressionRange::makeIntRange(
338  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
339  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
340  0,
341  source_col_range.hasNulls());
342  }
343  }
344  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
345  const auto max_hash_entry_count =
347  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
348  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
349 
350  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
351  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
352  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
353 
354  if (bucketized_entry_count > max_hash_entry_count) {
355  throw TooManyHashEntries();
356  }
357 
358  if (qual_bin_oper->get_optype() == kBW_EQ &&
359  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
360  throw HashJoinFail("Cannot translate null value for kBW_EQ");
361  }
362  auto join_hash_table =
363  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
364  inner_col,
365  query_infos,
366  memory_level,
367  preferred_hash_type,
368  col_range,
369  column_cache,
370  executor,
371  device_count));
372  try {
373  join_hash_table->reify(device_count);
374  } catch (const TableMustBeReplicated& e) {
375  // Throw a runtime error to abort the query
376  join_hash_table->freeHashBufferMemory();
377  throw std::runtime_error(e.what());
378  } catch (const HashJoinFail& e) {
379  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
380  // possible)
381  join_hash_table->freeHashBufferMemory();
382  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
383  "involved in equijoin | ") +
384  e.what());
385  } catch (const ColumnarConversionNotSupported& e) {
386  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
387  e.what());
388  } catch (const OutOfMemory& e) {
389  throw HashJoinFail(
390  std::string("Ran out of memory while building hash tables for equijoin | ") +
391  e.what());
392  } catch (const std::exception& e) {
393  throw std::runtime_error(
394  std::string("Fatal error while attempting to build hash tables for join: ") +
395  e.what());
396  }
397  return join_hash_table;
398 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
CHECK(cgen_state)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
size_t getNormalizedHashEntryCount() const
Definition: sqldefs.h:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int64_t JoinHashTable::getJoinHashBuffer ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1454 of file JoinHashTable.cpp.

References CHECK(), CHECK_LT, CPU, and cpu_hash_table_buff_.

Referenced by decodeJoinHashBuffer(), and toString().

1455  {
1456  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1457  return 0;
1458  }
1459 #ifdef HAVE_CUDA
1460  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1461  if (device_type == ExecutorDeviceType::CPU) {
1462  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1463  } else {
1464  return gpu_hash_table_buff_[device_id]
1465  ? reinterpret_cast<CUdeviceptr>(
1466  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1467  : reinterpret_cast<CUdeviceptr>(nullptr);
1468  }
1469 #else
1470  CHECK(device_type == ExecutorDeviceType::CPU);
1471  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1472 #endif
1473 }
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
CHECK(cgen_state)
#define CHECK_LT(x, y)
Definition: Logger.h:200

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t JoinHashTable::getJoinHashBufferSize ( const ExecutorDeviceType  device_type,
const int  device_id 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1475 of file JoinHashTable.cpp.

References CHECK(), CHECK_LT, CPU, and cpu_hash_table_buff_.

Referenced by decodeJoinHashBuffer(), and toString().

1476  {
1477  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1478  return 0;
1479  }
1480 #ifdef HAVE_CUDA
1481  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1482  if (device_type == ExecutorDeviceType::CPU) {
1483  return cpu_hash_table_buff_->size() *
1484  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1485  } else {
1486  return gpu_hash_table_buff_[device_id]
1487  ? gpu_hash_table_buff_[device_id]->reservedSize()
1488  : 0;
1489  }
1490 #else
1491  CHECK(device_type == ExecutorDeviceType::CPU);
1492  return cpu_hash_table_buff_->size() *
1493  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1494 #endif
1495 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
CHECK(cgen_state)
#define CHECK_LT(x, y)
Definition: Logger.h:200

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< const int8_t *, size_t > JoinHashTable::getOneColumnFragment ( const Analyzer::ColumnVar hash_col,
const Fragmenter_Namespace::FragmentInfo fragment,
const Data_Namespace::MemoryLevel  effective_mem_lvl,
const int  device_id,
std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)
private

Definition at line 464 of file JoinHashTable.cpp.

References column_cache_, executor_, and ColumnFetcher::getOneColumnFragment().

Referenced by fetchFragments().

469  {
471  hash_col,
472  fragment,
473  effective_mem_lvl,
474  device_id,
475  chunks_owner,
476  column_cache_);
477 }
ColumnCacheMap & column_cache_
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Executor * executor_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< JoinHashTable > JoinHashTable::getSyntheticInstance ( std::string_view  table1,
std::string_view  column1,
std::string_view  table2,
std::string_view  column2,
const Data_Namespace::MemoryLevel  memory_level,
const HashType  preferred_hash_type,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from named tables and columns (such as for testing).

Definition at line 401 of file JoinHashTable.cpp.

References CHECK(), getInstance(), kBOOLEAN, kEQ, and kONE.

410  {
411  auto catalog = executor->getCatalog();
412  CHECK(catalog);
413 
414  auto tmeta1 = catalog->getMetadataForTable(std::string(table1));
415  auto tmeta2 = catalog->getMetadataForTable(std::string(table2));
416 
417  CHECK(tmeta1);
418  CHECK(tmeta2);
419 
420  auto cmeta1 = catalog->getMetadataForColumn(tmeta1->tableId, std::string(column1));
421  auto cmeta2 = catalog->getMetadataForColumn(tmeta2->tableId, std::string(column2));
422 
423  CHECK(cmeta1);
424  CHECK(cmeta2);
425 
426  auto ti1 = cmeta1->columnType;
427  auto ti2 = cmeta2->columnType;
428 
429  auto a1 =
430  std::make_shared<Analyzer::ColumnVar>(ti1, tmeta1->tableId, cmeta1->columnId, 0);
431  auto a2 =
432  std::make_shared<Analyzer::ColumnVar>(ti2, tmeta2->tableId, cmeta2->columnId, 1);
433 
434  auto op = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kEQ, kONE, a1, a2);
435 
436  size_t number_of_join_tables{2};
437  std::vector<InputTableInfo> query_infos(number_of_join_tables);
438  query_infos[0].table_id = tmeta1->tableId;
439  query_infos[0].info = tmeta1->fragmenter->getFragmentsForQuery();
440  query_infos[1].table_id = tmeta2->tableId;
441  query_infos[1].info = tmeta2->fragmenter->getFragmentsForQuery();
442 
443  std::unordered_set<PhysicalInput> phys_inputs;
444  phys_inputs.emplace(PhysicalInput{cmeta1->columnId, cmeta1->tableId});
445  phys_inputs.emplace(PhysicalInput{cmeta2->columnId, cmeta2->tableId});
446 
447  std::unordered_set<int> phys_table_ids;
448  phys_table_ids.insert(cmeta1->tableId);
449  phys_table_ids.insert(cmeta2->tableId);
450 
451  executor->setupCaching(phys_inputs, phys_table_ids);
452 
453  auto hash_table = JoinHashTable::getInstance(op,
454  query_infos,
455  memory_level,
456  preferred_hash_type,
457  device_count,
458  column_cache,
459  executor);
460 
461  return hash_table;
462 }
Definition: sqldefs.h:30
CHECK(cgen_state)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: sqldefs.h:69

+ Here is the call graph for this function:

void JoinHashTable::initHashTableForDevice ( const ChunkKey chunk_key,
const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 970 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK(), CHECK_EQ, CHECK_GT, col_range_, copy_from_gpu(), copy_to_gpu(), cpu_hash_table_buff_, cpu_hash_table_buff_mutex_, Data_Namespace::CPU_LEVEL, device_count_, executor_, fill_hash_join_buff_on_device_bucketized(), fill_hash_join_buff_on_device_sharded_bucketized(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), ExpressionRange::getIntMax(), ExpressionRange::getIntMin(), Data_Namespace::GPU_LEVEL, init_hash_join_buff_on_device(), initHashTableOnCpu(), initHashTableOnCpuFromCache(), inline_fixed_encoding_null_val(), isBitwiseEq(), memory_level_, putHashTableOnCpuToCache(), and shardCount().

Referenced by reifyOneToOneForDevice().

976  {
977  const auto inner_col = cols.first;
978  CHECK(inner_col);
979 
980  auto hash_entry_info = get_bucketized_hash_entry_info(
981  inner_col->get_type_info(), col_range_, isBitwiseEq());
982  if (!hash_entry_info) {
983  return;
984  }
985 
986 #ifdef HAVE_CUDA
987  const auto shard_count = shardCount();
988  const size_t entries_per_shard{
989  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
990  : 0};
991  // Even if we join on dictionary encoded strings, the memory on the GPU is still
992  // needed once the join hash table has been built on the CPU.
993  const auto catalog = executor_->getCatalog();
995  auto& data_mgr = catalog->getDataMgr();
996  if (shard_count) {
997  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
998  CHECK_GT(shards_per_device, 0u);
999  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1000  }
1001  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1002  &data_mgr,
1003  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
1004  device_id);
1005  }
1006 #else
1007  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1008 #endif
1009 
1010 #ifdef HAVE_CUDA
1011  const auto& ti = inner_col->get_type_info();
1012 #endif
1013  const int32_t hash_join_invalid_val{-1};
1014  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1015  CHECK(!chunk_key.empty());
1016  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1017  {
1018  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1020  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1021  }
1022  if (inner_col->get_table_id() > 0) {
1023  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1024  }
1025  // Transfer the hash table on the GPU if we've only built it on CPU
1026  // but the query runs on GPU (join on dictionary encoded columns).
1028 #ifdef HAVE_CUDA
1029  CHECK(ti.is_string());
1030  auto& data_mgr = catalog->getDataMgr();
1031  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1032 
1033  copy_to_gpu(
1034  &data_mgr,
1035  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1036  &(*cpu_hash_table_buff_)[0],
1037  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1038  device_id);
1039 #else
1040  CHECK(false);
1041 #endif
1042  }
1043  } else {
1044 #ifdef HAVE_CUDA
1045  int err{0};
1046  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1047  auto& data_mgr = catalog->getDataMgr();
1048  gpu_hash_table_err_buff_[device_id] =
1049  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
1050  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
1051  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
1052  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1054  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1055  hash_entry_info.getNormalizedHashEntryCount(),
1056  hash_join_invalid_val,
1057  executor_->blockSize(),
1058  executor_->gridSize());
1059  if (chunk_key.empty()) {
1060  return;
1061  }
1062  JoinColumn join_column{col_buff, num_elements};
1063  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1067  isBitwiseEq(),
1068  col_range_.getIntMax() + 1,
1070  if (shard_count) {
1071  CHECK_GT(device_count_, 0);
1072  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1073  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1075  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1076  hash_join_invalid_val,
1077  reinterpret_cast<int*>(dev_err_buff),
1078  join_column,
1079  type_info,
1080  shard_info,
1081  executor_->blockSize(),
1082  executor_->gridSize(),
1083  hash_entry_info.bucket_normalization);
1084  }
1085  } else {
1087  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1088  hash_join_invalid_val,
1089  reinterpret_cast<int*>(dev_err_buff),
1090  join_column,
1091  type_info,
1092  executor_->blockSize(),
1093  executor_->gridSize(),
1094  hash_entry_info.bucket_normalization);
1095  }
1096  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1097 
1098  if (err) {
1099  throw NeedsOneToManyHash();
1100  }
1101 #else
1102  CHECK(false);
1103 #endif
1104  }
1105 }
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const int device_count_
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
#define CHECK_GT(x, y)
Definition: Logger.h:202
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
size_t shardCount() const
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
const Data_Namespace::MemoryLevel memory_level_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::initHashTableOnCpu ( const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 791 of file JoinHashTable.cpp.

References HashEntryInfo::bucket_normalization, CHECK(), CHECK_EQ, col_range_, cpu_hash_table_buff_, cpu_threads(), executor_, fill_hash_join_buff_bucketized(), get_join_column_type_kind(), ExpressionRange::getIntMax(), ExpressionRange::getIntMin(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), isBitwiseEq(), and kENCODING_DICT.

Referenced by initHashTableForDevice().

796  {
797  const auto inner_col = cols.first;
798  CHECK(inner_col);
799  const auto& ti = inner_col->get_type_info();
800  if (!cpu_hash_table_buff_) {
801  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
802  hash_entry_info.getNormalizedHashEntryCount());
803  const StringDictionaryProxy* sd_inner_proxy{nullptr};
804  const StringDictionaryProxy* sd_outer_proxy{nullptr};
805  if (ti.is_string()) {
806  CHECK_EQ(kENCODING_DICT, ti.get_compression());
807  sd_inner_proxy = executor_->getStringDictionaryProxy(
808  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
809  CHECK(sd_inner_proxy);
810  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
811  CHECK(outer_col);
812  sd_outer_proxy = executor_->getStringDictionaryProxy(
813  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
814  CHECK(sd_outer_proxy);
815  }
816  int thread_count = cpu_threads();
817  std::vector<std::thread> init_cpu_buff_threads;
818  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
819  init_cpu_buff_threads.emplace_back(
820  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
822  hash_entry_info.getNormalizedHashEntryCount(),
823  hash_join_invalid_val,
824  thread_idx,
825  thread_count);
826  });
827  }
828  for (auto& t : init_cpu_buff_threads) {
829  t.join();
830  }
831  init_cpu_buff_threads.clear();
832  int err{0};
833  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
834  init_cpu_buff_threads.emplace_back([this,
835  hash_join_invalid_val,
836  col_buff,
837  num_elements,
838  sd_inner_proxy,
839  sd_outer_proxy,
840  thread_idx,
841  thread_count,
842  &ti,
843  &err,
844  hash_entry_info] {
845  int partial_err =
847  hash_join_invalid_val,
848  {col_buff, num_elements},
849  {static_cast<size_t>(ti.get_size()),
853  isBitwiseEq(),
854  col_range_.getIntMax() + 1,
856  sd_inner_proxy,
857  sd_outer_proxy,
858  thread_idx,
859  thread_count,
860  hash_entry_info.bucket_normalization);
861  __sync_val_compare_and_swap(&err, 0, partial_err);
862  });
863  }
864  for (auto& t : init_cpu_buff_threads) {
865  t.join();
866  }
867  if (err) {
868  cpu_hash_table_buff_.reset();
869  // Too many hash entries, need to retry with a 1:many table
870  throw NeedsOneToManyHash();
871  }
872  } else {
873  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
874  // Too many hash entries, need to retry with a 1:many table
875  throw NeedsOneToManyHash();
876  }
877  }
878 }
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
CHECK(cgen_state)
int64_t bucket_normalization
Executor * executor_
ExpressionRange col_range_
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::initHashTableOnCpuFromCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols 
)
private

Definition at line 1238 of file JoinHashTable.cpp.

References col_range_, cpu_hash_table_buff_, cpu_hash_table_buff_mutex_, join_hash_table_cache_, join_hash_table_cache_mutex_, and qual_bin_oper_.

Referenced by initHashTableForDevice(), and initOneToManyHashTable().

1241  {
1242  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1243  JoinHashTableCacheKey cache_key{col_range_,
1244  *cols.first,
1245  outer_col ? *outer_col : *cols.first,
1246  num_elements,
1247  chunk_key,
1248  qual_bin_oper_->get_optype()};
1249  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1250  for (const auto& kv : join_hash_table_cache_) {
1251  if (kv.first == cache_key) {
1252  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1253  cpu_hash_table_buff_ = kv.second;
1254  break;
1255  }
1256  }
1257 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_

+ Here is the caller graph for this function:

void JoinHashTable::initOneToManyHashTable ( const ChunkKey chunk_key,
const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols,
const Data_Namespace::MemoryLevel  effective_memory_level,
const int  device_id 
)
private

Definition at line 1107 of file JoinHashTable.cpp.

References CudaAllocator::allocGpuAbstractBuffer(), CHECK(), CHECK_EQ, CHECK_GT, col_range_, copy_to_gpu(), cpu_hash_table_buff_, cpu_hash_table_buff_mutex_, Data_Namespace::CPU_LEVEL, device_count_, executor_, fill_one_to_many_hash_table_on_device(), fill_one_to_many_hash_table_on_device_bucketized(), fill_one_to_many_hash_table_on_device_sharded(), anonymous_namespace{JoinHashTable.cpp}::get_bucketized_hash_entry_info(), get_join_column_type_kind(), get_shard_count(), ExpressionRange::getIntMax(), ExpressionRange::getIntMin(), Data_Namespace::GPU_LEVEL, hash_entry_count_, init_hash_join_buff_on_device(), initHashTableOnCpuFromCache(), initOneToManyHashTableOnCpu(), inline_fixed_encoding_null_val(), isBitwiseEq(), kDATE, memory_level_, putHashTableOnCpuToCache(), and qual_bin_oper_.

Referenced by reifyOneToManyForDevice().

1113  {
1114  auto const inner_col = cols.first;
1115  CHECK(inner_col);
1116 
1117  auto hash_entry_info = get_bucketized_hash_entry_info(
1118  inner_col->get_type_info(), col_range_, isBitwiseEq());
1119 
1120 #ifdef HAVE_CUDA
1121  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1122  const size_t entries_per_shard =
1123  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1124  : 0);
1125  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1126  // needed once the join hash table has been built on the CPU.
1127  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1128  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1129  CHECK_GT(shards_per_device, 0u);
1130  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1131  }
1132 #else
1133  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1134 #endif
1135  if (!device_id) {
1136  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1137  }
1138 
1139 #ifdef HAVE_CUDA
1140  const auto& ti = inner_col->get_type_info();
1141  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1143  const size_t total_count =
1144  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements;
1145  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1146  &data_mgr, total_count * sizeof(int32_t), device_id);
1147  }
1148 #endif
1149  const int32_t hash_join_invalid_val{-1};
1150  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1151  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1152  {
1153  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1155  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1156  }
1157  if (inner_col->get_table_id() > 0) {
1158  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1159  }
1160  // Transfer the hash table on the GPU if we've only built it on CPU
1161  // but the query runs on GPU (join on dictionary encoded columns).
1162  // Don't transfer the buffer if there was an error since we'll bail anyway.
1164 #ifdef HAVE_CUDA
1165  CHECK(ti.is_string());
1166  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1167  copy_to_gpu(
1168  &data_mgr,
1169  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1170  &(*cpu_hash_table_buff_)[0],
1171  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1172  device_id);
1173 #else
1174  CHECK(false);
1175 #endif
1176  }
1177  } else {
1178 #ifdef HAVE_CUDA
1179  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1180  data_mgr.getCudaMgr()->setContext(device_id);
1182  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1183  hash_entry_info.getNormalizedHashEntryCount(),
1184  hash_join_invalid_val,
1185  executor_->blockSize(),
1186  executor_->gridSize());
1187  JoinColumn join_column{col_buff, num_elements};
1188  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1192  isBitwiseEq(),
1193  col_range_.getIntMax() + 1,
1195  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1196 
1197  if (shard_count) {
1198  CHECK_GT(device_count_, 0);
1199  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1200  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1202  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1203  hash_entry_info,
1204  hash_join_invalid_val,
1205  join_column,
1206  type_info,
1207  shard_info,
1208  executor_->blockSize(),
1209  executor_->gridSize());
1210  }
1211  } else {
1212  if (use_bucketization) {
1214  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1215  hash_entry_info,
1216  hash_join_invalid_val,
1217  join_column,
1218  type_info,
1219  executor_->blockSize(),
1220  executor_->gridSize());
1221  } else {
1223  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1224  hash_entry_info,
1225  hash_join_invalid_val,
1226  join_column,
1227  type_info,
1228  executor_->blockSize(),
1229  executor_->gridSize());
1230  }
1231  }
1232 #else
1233  CHECK(false);
1234 #endif
1235  }
1236 }
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
const int device_count_
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
#define CHECK_GT(x, y)
Definition: Logger.h:202
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
Definition: sqltypes.h:56
Executor * executor_
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
size_t hash_entry_count_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::initOneToManyHashTableOnCpu ( const int8_t *  col_buff,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols,
const HashEntryInfo  hash_entry_info,
const int32_t  hash_join_invalid_val 
)
private

Definition at line 880 of file JoinHashTable.cpp.

References CHECK(), CHECK_EQ, col_range_, cpu_hash_table_buff_, cpu_threads(), executor_, fill_one_to_many_hash_table(), fill_one_to_many_hash_table_bucketized(), get_join_column_type_kind(), ExpressionRange::getIntMax(), ExpressionRange::getIntMin(), HashEntryInfo::getNormalizedHashEntryCount(), init_hash_join_buff(), inline_fixed_encoding_null_val(), isBitwiseEq(), kDATE, and kENCODING_DICT.

Referenced by initOneToManyHashTable().

885  {
886  const auto inner_col = cols.first;
887  CHECK(inner_col);
888  const auto& ti = inner_col->get_type_info();
889  if (cpu_hash_table_buff_) {
890  return;
891  }
892  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
893  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements);
894  const StringDictionaryProxy* sd_inner_proxy{nullptr};
895  const StringDictionaryProxy* sd_outer_proxy{nullptr};
896  if (ti.is_string()) {
897  CHECK_EQ(kENCODING_DICT, ti.get_compression());
898  sd_inner_proxy = executor_->getStringDictionaryProxy(
899  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
900  CHECK(sd_inner_proxy);
901  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
902  CHECK(outer_col);
903  sd_outer_proxy = executor_->getStringDictionaryProxy(
904  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
905  CHECK(sd_outer_proxy);
906  }
907  int thread_count = cpu_threads();
908  std::vector<std::future<void>> init_threads;
909  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
910  init_threads.emplace_back(std::async(std::launch::async,
912  &(*cpu_hash_table_buff_)[0],
913  hash_entry_info.getNormalizedHashEntryCount(),
914  hash_join_invalid_val,
915  thread_idx,
916  thread_count));
917  }
918  for (auto& child : init_threads) {
919  child.wait();
920  }
921  for (auto& child : init_threads) {
922  child.get();
923  }
924 
925  if (ti.get_type() == kDATE) {
927  hash_entry_info,
928  hash_join_invalid_val,
929  {col_buff, num_elements},
930  {static_cast<size_t>(ti.get_size()),
934  isBitwiseEq(),
935  col_range_.getIntMax() + 1,
937  sd_inner_proxy,
938  sd_outer_proxy,
939  thread_count);
940  } else {
942  hash_entry_info,
943  hash_join_invalid_val,
944  {col_buff, num_elements},
945  {static_cast<size_t>(ti.get_size()),
949  isBitwiseEq(),
950  col_range_.getIntMax() + 1,
952  sd_inner_proxy,
953  sd_outer_proxy,
954  thread_count);
955  }
956 }
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
CHECK(cgen_state)
Definition: sqltypes.h:56
Executor * executor_
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
ExpressionRange col_range_
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool JoinHashTable::isBitwiseEq ( ) const
private

Definition at line 1627 of file JoinHashTable.cpp.

References kBW_EQ, and qual_bin_oper_.

Referenced by codegenMatchingSet(), codegenSlot(), getHashJoinArgs(), initHashTableForDevice(), initHashTableOnCpu(), initOneToManyHashTable(), and initOneToManyHashTableOnCpu().

1627  {
1628  return qual_bin_oper_->get_optype() == kBW_EQ;
1629 }
Definition: sqldefs.h:31
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_

+ Here is the caller graph for this function:

size_t JoinHashTable::offsetBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1435 of file JoinHashTable.cpp.

References CHECK(), hash_type_, and JoinHashTableInterface::OneToMany.

Referenced by decodeJoinHashBuffer(), and toString().

1435  {
1437  return 0;
1438 }
HashType hash_type_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t JoinHashTable::payloadBufferOff ( ) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1445 of file JoinHashTable.cpp.

References CHECK(), getComponentBufferSize(), hash_type_, and JoinHashTableInterface::OneToMany.

Referenced by decodeJoinHashBuffer(), and toString().

1445  {
1447  return 2 * getComponentBufferSize();
1448 }
HashType hash_type_
CHECK(cgen_state)
size_t getComponentBufferSize() const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::putHashTableOnCpuToCache ( const ChunkKey chunk_key,
const size_t  num_elements,
const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &  cols 
)
private

Definition at line 1259 of file JoinHashTable.cpp.

References col_range_, cpu_hash_table_buff_, join_hash_table_cache_, join_hash_table_cache_mutex_, and qual_bin_oper_.

Referenced by initHashTableForDevice(), and initOneToManyHashTable().

1262  {
1263  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1264  JoinHashTableCacheKey cache_key{col_range_,
1265  *cols.first,
1266  outer_col ? *outer_col : *cols.first,
1267  num_elements,
1268  chunk_key,
1269  qual_bin_oper_->get_optype()};
1270  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1271  for (const auto& kv : join_hash_table_cache_) {
1272  if (kv.first == cache_key) {
1273  return;
1274  }
1275  }
1276  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1277 }
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
ExpressionRange col_range_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_

+ Here is the caller graph for this function:

void JoinHashTable::reify ( const int  device_count)
private

Definition at line 545 of file JoinHashTable.cpp.

References CHECK_LT, checkHashJoinReplicationConstraint(), executor_, freeHashBufferMemory(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), getInnerQueryInfo(), hash_type_, InputTableInfo::info, JoinHashTableInterface::OneToMany, JoinHashTableInterface::OneToOne, only_shards_for_device(), qual_bin_oper_, reifyOneToManyForDevice(), reifyOneToOneForDevice(), and shardCount().

545  {
546  CHECK_LT(0, device_count);
547  const auto& catalog = *executor_->getCatalog();
548  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
549  const auto inner_col = cols.first;
550  checkHashJoinReplicationConstraint(inner_col->get_table_id());
551  const auto& query_info = getInnerQueryInfo(inner_col).info;
552  if (query_info.fragments.empty()) {
553  return;
554  }
555  if (query_info.getNumTuplesUpperBound() >
556  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
557  throw TooManyHashEntries();
558  }
559 #ifdef HAVE_CUDA
560  gpu_hash_table_buff_.resize(device_count);
561  gpu_hash_table_err_buff_.resize(device_count);
562 #endif // HAVE_CUDA
563  std::vector<std::future<void>> init_threads;
564  const int shard_count = shardCount();
565 
566  try {
567  for (int device_id = 0; device_id < device_count; ++device_id) {
568  const auto fragments =
569  shard_count
570  ? only_shards_for_device(query_info.fragments, device_id, device_count)
571  : query_info.fragments;
572  init_threads.push_back(
573  std::async(std::launch::async,
577  this,
578  fragments,
579  device_id));
580  }
581  for (auto& init_thread : init_threads) {
582  init_thread.wait();
583  }
584  for (auto& init_thread : init_threads) {
585  init_thread.get();
586  }
587 
588  } catch (const NeedsOneToManyHash& e) {
591  init_threads.clear();
592  for (int device_id = 0; device_id < device_count; ++device_id) {
593  const auto fragments =
594  shard_count
595  ? only_shards_for_device(query_info.fragments, device_id, device_count)
596  : query_info.fragments;
597 
598  init_threads.push_back(std::async(std::launch::async,
600  this,
601  fragments,
602  device_id));
603  }
604  for (auto& init_thread : init_threads) {
605  init_thread.wait();
606  }
607  for (auto& init_thread : init_threads) {
608  init_thread.get();
609  }
610  }
611 }
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
void freeHashBufferMemory()
HashType hash_type_
void checkHashJoinReplicationConstraint(const int table_id) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
#define CHECK_LT(x, y)
Definition: Logger.h:200
size_t shardCount() const
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const

+ Here is the call graph for this function:

void JoinHashTable::reifyOneToManyForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
private

Definition at line 729 of file JoinHashTable.cpp.

References CHECK(), Data_Namespace::CPU_LEVEL, executor_, fetchFragments(), genHashTableKey(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), initOneToManyHashTable(), memory_level_, needs_dictionary_translation(), and qual_bin_oper_.

Referenced by reify().

731  {
732  const auto& catalog = *executor_->getCatalog();
733  auto& data_mgr = catalog.getDataMgr();
734  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
735  const auto inner_col = cols.first;
736  CHECK(inner_col);
737  const auto inner_cd = get_column_descriptor_maybe(
738  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
739  if (inner_cd && inner_cd->isVirtualCol) {
741  }
742  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
743  // Since we don't have the string dictionary payloads on the GPU, we'll build
744  // the join hash table on the CPU and transfer it to the GPU.
745  const auto effective_memory_level =
746  needs_dictionary_translation(inner_col, cols.second, executor_)
748  : memory_level_;
749  if (fragments.empty()) {
750  ChunkKey empty_chunk;
752  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
753  return;
754  }
755 
756  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
757  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
758  const int8_t* col_buff = nullptr;
759  size_t elem_count = 0;
760 
761  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
762  fragments,
763  effective_memory_level,
764  device_id,
765  chunks_owner,
766  dev_buff_owner);
767 
768  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
769  col_buff,
770  elem_count,
771  cols,
772  effective_memory_level,
773  device_id);
774 }
std::vector< int > ChunkKey
Definition: types.h:35
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
CHECK(cgen_state)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
const Data_Namespace::MemoryLevel memory_level_
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void JoinHashTable::reifyOneToOneForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
private

Definition at line 681 of file JoinHashTable.cpp.

References CHECK(), Data_Namespace::CPU_LEVEL, executor_, fetchFragments(), genHashTableKey(), anonymous_namespace{JoinHashTable.cpp}::get_cols(), get_column_descriptor_maybe(), initHashTableForDevice(), memory_level_, needs_dictionary_translation(), and qual_bin_oper_.

Referenced by reify().

683  {
684  const auto& catalog = *executor_->getCatalog();
685  auto& data_mgr = catalog.getDataMgr();
686  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
687  const auto inner_col = cols.first;
688  CHECK(inner_col);
689  const auto inner_cd = get_column_descriptor_maybe(
690  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
691  if (inner_cd && inner_cd->isVirtualCol) {
693  }
694  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
695  // Since we don't have the string dictionary payloads on the GPU, we'll build
696  // the join hash table on the CPU and transfer it to the GPU.
697  const auto effective_memory_level =
698  needs_dictionary_translation(inner_col, cols.second, executor_)
700  : memory_level_;
701  if (fragments.empty()) {
702  // No data in this fragment. Still need to create a hash table and initialize it
703  // properly.
704  ChunkKey empty_chunk;
706  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
707  }
708 
709  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
710  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
711  const int8_t* col_buff = nullptr;
712  size_t elem_count = 0;
713 
714  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
715  fragments,
716  effective_memory_level,
717  device_id,
718  chunks_owner,
719  dev_buff_owner);
720 
721  initHashTableForDevice(genHashTableKey(fragments, cols.second, inner_col),
722  col_buff,
723  elem_count,
724  cols,
725  effective_memory_level,
726  device_id);
727 }
std::vector< int > ChunkKey
Definition: types.h:35
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
CHECK(cgen_state)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t JoinHashTable::shardCount ( ) const
private

Definition at line 1621 of file JoinHashTable.cpp.

References executor_, get_shard_count(), Data_Namespace::GPU_LEVEL, memory_level_, and qual_bin_oper_.

Referenced by codegenMatchingSet(), codegenSlot(), getAllColumnFragments(), initHashTableForDevice(), and reify().

1621  {
1624  : 0;
1625 }
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Executor * executor_
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
const Data_Namespace::MemoryLevel memory_level_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string JoinHashTable::toString ( const ExecutorDeviceType  device_type,
const int  device_id,
bool  raw = false 
) const
overridevirtualnoexcept

Implements JoinHashTableInterface.

Definition at line 1497 of file JoinHashTable.cpp.

References copy_from_gpu(), countBufferOff(), decodeJoinHashBufferToString(), executor_, getJoinHashBuffer(), getJoinHashBufferSize(), GPU, offsetBufferOff(), and payloadBufferOff().

1499  {
1500  auto buffer = getJoinHashBuffer(device_type, device_id);
1501  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1502 #ifdef HAVE_CUDA
1503  std::unique_ptr<int8_t[]> buffer_copy;
1504  if (device_type == ExecutorDeviceType::GPU) {
1505  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1506 
1507  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1508  buffer_copy.get(),
1509  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1510  buffer_size,
1511  device_id);
1512  }
1513  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1514 #else
1515  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1516 #endif // HAVE_CUDA
1517  auto ptr2 = ptr1 + offsetBufferOff();
1518  auto ptr3 = ptr1 + countBufferOff();
1519  auto ptr4 = ptr1 + payloadBufferOff();
1521  1, sizeof(int32_t), ptr1, ptr2, ptr3, ptr4, buffer_size, raw);
1522 }
size_t payloadBufferOff() const noexceptoverride
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::string decodeJoinHashBufferToString(size_t key_component_count, size_t key_component_width, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw)
size_t offsetBufferOff() const noexceptoverride
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
Executor * executor_
size_t countBufferOff() const noexceptoverride

+ Here is the call graph for this function:

static auto JoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 122 of file JoinHashTable.h.

References join_hash_table_cache_, and join_hash_table_cache_mutex_.

122  {
123  return []() -> void {
124  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
125  join_hash_table_cache_.clear();
126  };
127  }
static std::mutex join_hash_table_cache_mutex_
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_

Member Data Documentation

std::shared_ptr<Analyzer::ColumnVar> JoinHashTable::col_var_
private

Definition at line 242 of file JoinHashTable.h.

Referenced by getInnerTableId(), and getInnerTableRteIdx().

ColumnCacheMap& JoinHashTable::column_cache_
private

Definition at line 255 of file JoinHashTable.h.

Referenced by getAllColumnFragments(), and getOneColumnFragment().

std::mutex JoinHashTable::cpu_hash_table_buff_mutex_
private
const int JoinHashTable::device_count_
private
size_t JoinHashTable::hash_entry_count_
private

Definition at line 246 of file JoinHashTable.h.

Referenced by getComponentBufferSize(), and initOneToManyHashTable().

HashType JoinHashTable::hash_type_
private
std::vector< std::pair< JoinHashTable::JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > JoinHashTable::join_hash_table_cache_
staticprivate
std::mutex JoinHashTable::join_hash_table_cache_mutex_
staticprivate
std::pair<const int8_t*, size_t> JoinHashTable::linearized_multifrag_column_
private

Definition at line 257 of file JoinHashTable.h.

Referenced by getAllColumnFragments().

std::mutex JoinHashTable::linearized_multifrag_column_mutex_
private

Definition at line 258 of file JoinHashTable.h.

Referenced by getAllColumnFragments().

RowSetMemoryOwner JoinHashTable::linearized_multifrag_column_owner_
private

Definition at line 259 of file JoinHashTable.h.

Referenced by getAllColumnFragments().

const Data_Namespace::MemoryLevel JoinHashTable::memory_level_
private
const std::vector<InputTableInfo>& JoinHashTable::query_infos_
private

Definition at line 243 of file JoinHashTable.h.

Referenced by getInnerQueryInfo().


The documentation for this class was generated from the following files: