OmniSciDB  04ee39c94c
OverlapsJoinHashTable Class Reference

#include <OverlapsJoinHashTable.h>

+ Inheritance diagram for OverlapsJoinHashTable:
+ Collaboration diagram for OverlapsJoinHashTable:

Public Member Functions

 OverlapsJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const size_t entry_count, ColumnCacheMap &column_map, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
 ~OverlapsJoinHashTable () override
 
size_t countBufferOff () const noexcept override
 
size_t payloadBufferOff () const noexcept override
 
- Public Member Functions inherited from BaselineJoinHashTable
int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) noexcept override
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexcept override
 
int getInnerTableRteIdx () const noexcept override
 
JoinHashTableInterface::HashType getHashType () const noexcept override
 
size_t offsetBufferOff () const noexcept override
 
virtual ~BaselineJoinHashTable ()
 

Static Public Member Functions

static std::shared_ptr< OverlapsJoinHashTablegetInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_map, Executor *executor)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
- Static Public Member Functions inherited from BaselineJoinHashTable
static std::shared_ptr< BaselineJoinHashTablegetInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_map, Executor *executor)
 
static size_t getShardCountForCondition (const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 

Protected Member Functions

void reifyWithLayout (const int device_count, const JoinHashTableInterface::HashType layout) override
 
std::pair< size_t, size_t > calculateCounts (size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, const int device_count, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
 
size_t calculateHashTableSize (size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
 
ColumnsForDevice fetchColumnsForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id) override
 
std::pair< size_t, size_t > approximateTupleCount (const std::vector< ColumnsForDevice > &) const override
 
size_t getKeyComponentWidth () const override
 
size_t getKeyComponentCount () const override
 
int initHashTableOnCpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout) override
 
int initHashTableOnGpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id) override
 
llvm::Value * codegenKey (const CompilationOptions &) override
 
- Protected Member Functions inherited from BaselineJoinHashTable
 BaselineJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_map, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
std::pair< const int8_t *, size_t > getAllColumnFragments (const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
size_t shardCount () const
 
Data_Namespace::MemoryLevel getEffectiveMemoryLevel (const std::vector< InnerOuter > &inner_outer_pairs) const
 
CompositeKeyInfo getCompositeKeyInfo () const
 
void reify (const int device_count)
 
JoinColumn fetchColumn (const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
 
void reifyForDevice (const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
int initHashTableForDevice (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
llvm::Value * hashPtr (const size_t index)
 
void initHashTableOnCpuFromCache (const HashTableCacheKey &)
 
void putHashTableOnCpuToCache (const HashTableCacheKey &)
 
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache (const HashTableCacheKey &) const
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
const HashTableCacheValuefindHashTableOnCpuInCache (const HashTableCacheKey &)
 

Static Protected Attributes

static std::map< HashTableCacheKey, double > auto_tuner_cache_
 
static std::mutex auto_tuner_cache_mutex_
 
- Static Protected Attributes inherited from BaselineJoinHashTable
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
 
static std::mutex hash_table_cache_mutex_
 
static const int ERR_FAILED_TO_FETCH_COLUMN {-3}
 
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN {-4}
 

Private Member Functions

void computeBucketSizes (std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const std::vector< InnerOuter > &inner_outer_pairs)
 

Private Attributes

std::vector< double > bucket_sizes_for_dimension_
 
double overlaps_hashjoin_bucket_threshold_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType { HashType::OneToOne, HashType::OneToMany }
 
- Protected Types inherited from BaselineJoinHashTable
typedef std::pair< const int8_t *, size_t > LinearizedColumn
 
typedef std::pair< int, int > LinearizedColumnCacheKey
 
- Static Protected Member Functions inherited from BaselineJoinHashTable
static int getInnerTableId (const std::vector< InnerOuter > &inner_outer_pairs)
 
- Protected Attributes inherited from BaselineJoinHashTable
const std::shared_ptr< Analyzer::BinOpercondition_
 
const std::vector< InputTableInfo > & query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
JoinHashTableInterface::HashType layout_
 
size_t entry_count_
 
size_t emitted_keys_count_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
std::map< LinearizedColumnCacheKey, LinearizedColumnlinearized_multifrag_columns_
 
std::mutex linearized_multifrag_column_mutex_
 
RowSetMemoryOwner linearized_multifrag_column_owner_
 
std::vector< InnerOuterinner_outer_pairs_
 
const Catalog_Namespace::Catalogcatalog_
 

Detailed Description

Definition at line 22 of file OverlapsJoinHashTable.h.

Constructor & Destructor Documentation

◆ OverlapsJoinHashTable()

OverlapsJoinHashTable::OverlapsJoinHashTable ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const size_t  entry_count,
ColumnCacheMap column_map,
Executor executor,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
inline

Definition at line 24 of file OverlapsJoinHashTable.h.

31  : BaselineJoinHashTable(condition,
32  query_infos,
33  memory_level,
35  entry_count,
36  column_map,
37  executor,
38  inner_outer_pairs) {}
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_map, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)

◆ ~OverlapsJoinHashTable()

OverlapsJoinHashTable::~OverlapsJoinHashTable ( )
inlineoverride

Definition at line 40 of file OverlapsJoinHashTable.h.

References getInstance().

40 {}
+ Here is the call graph for this function:

Member Function Documentation

◆ approximateTupleCount()

std::pair< size_t, size_t > OverlapsJoinHashTable::approximateTupleCount ( const std::vector< ColumnsForDevice > &  columns_per_device) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 250 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), approximate_distinct_tuples_on_device_overlaps(), approximate_distinct_tuples_overlaps(), Bitmap, CHECK, CHECK_EQ, CHECK_GT, BaselineJoinHashTable::condition_, copy_from_gpu(), copy_to_gpu(), CPU, Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getApproximateTupleCountFromCache(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getEffectiveMemoryLevel(), GPU, Data_Namespace::GPU_LEVEL, hll_size(), hll_unify(), BaselineJoinHashTable::inner_outer_pairs_, overlaps_hashjoin_bucket_threshold_, transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), UNREACHABLE, and VLOG.

Referenced by calculateCounts(), and yieldCacheInvalidator().

251  {
252  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
253  CountDistinctDescriptor count_distinct_desc{
255  0,
256  11,
257  true,
258  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
261  1};
262  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
263 
264  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
265  // Number of keys must match dimension of buckets
266  CHECK_EQ(columns_per_device.front().join_columns.size(),
267  columns_per_device.front().join_buckets.size());
268  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
269  const auto composite_key_info = getCompositeKeyInfo();
270  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
271  composite_key_info.cache_key_chunks,
272  condition_->get_optype(),
274  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
275  if (cached_count_info.first >= 0) {
276  VLOG(1) << "Using a cached tuple count: " << cached_count_info.first
277  << ", emitted keys count: " << cached_count_info.second;
278  return std::make_pair(cached_count_info.first, cached_count_info.second);
279  }
280  int thread_count = cpu_threads();
281  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
282  auto hll_result = &hll_buffer_all_cpus[0];
283 
284  std::vector<int32_t> num_keys_for_row;
285  // TODO(adb): support multi-column overlaps join
286  CHECK_EQ(columns_per_device.size(), 1u);
287  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
288 
290  num_keys_for_row,
291  count_distinct_desc.bitmap_sz_bits,
292  padded_size_bytes,
293  columns_per_device.front().join_columns,
294  columns_per_device.front().join_column_types,
295  columns_per_device.front().join_buckets,
296  thread_count);
297  for (int i = 1; i < thread_count; ++i) {
298  hll_unify(hll_result,
299  hll_result + i * padded_size_bytes,
300  1 << count_distinct_desc.bitmap_sz_bits);
301  }
302  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
303  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
304  }
305 #ifdef HAVE_CUDA
306  const int device_count = columns_per_device.size();
307  auto& data_mgr = executor_->getCatalog()->getDataMgr();
308  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count);
309  for (auto& host_hll_buffer : host_hll_buffers) {
310  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
311  }
312  std::vector<size_t> emitted_keys_count_device_threads(device_count, 0);
313  std::vector<std::future<void>> approximate_distinct_device_threads;
314  for (int device_id = 0; device_id < device_count; ++device_id) {
315  approximate_distinct_device_threads.emplace_back(std::async(
316  std::launch::async,
317  [device_id,
318  &columns_per_device,
319  &count_distinct_desc,
320  &data_mgr,
321  &host_hll_buffers,
322  &emitted_keys_count_device_threads,
323  this] {
324  ThrustAllocator allocator(&data_mgr, device_id);
325  auto device_hll_buffer =
326  allocator.allocateScopedBuffer(count_distinct_desc.bitmapPaddedSizeBytes());
327  data_mgr.getCudaMgr()->zeroDeviceMem(
328  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
329  const auto& columns_for_device = columns_per_device[device_id];
330  auto join_columns_gpu =
331  transfer_pod_vector_to_gpu(columns_for_device.join_columns, allocator);
332 
333  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
334  const auto& bucket_sizes_for_dimension =
335  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
336  auto bucket_sizes_gpu = allocator.allocateScopedBuffer(
337  bucket_sizes_for_dimension.size() * sizeof(double));
338  copy_to_gpu(&data_mgr,
339  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
340  bucket_sizes_for_dimension.data(),
341  bucket_sizes_for_dimension.size() * sizeof(double),
342  device_id);
343  const size_t row_counts_buffer_sz =
344  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
345  auto row_counts_buffer = allocator.allocateScopedBuffer(row_counts_buffer_sz);
346  data_mgr.getCudaMgr()->zeroDeviceMem(
347  row_counts_buffer, row_counts_buffer_sz, device_id);
348  const auto key_handler =
349  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
350  join_columns_gpu,
351  reinterpret_cast<double*>(bucket_sizes_gpu));
352  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
354  reinterpret_cast<uint8_t*>(device_hll_buffer),
355  count_distinct_desc.bitmap_sz_bits,
356  reinterpret_cast<int32_t*>(row_counts_buffer),
357  key_handler_gpu,
358  columns_for_device.join_columns[0].num_elems,
359  executor_->blockSize(),
360  executor_->gridSize());
361 
362  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
363  copy_from_gpu(&data_mgr,
364  &host_emitted_keys_count,
365  reinterpret_cast<CUdeviceptr>(
366  row_counts_buffer +
367  (columns_per_device.front().join_columns[0].num_elems - 1) *
368  sizeof(int32_t)),
369  sizeof(int32_t),
370  device_id);
371 
372  auto& host_hll_buffer = host_hll_buffers[device_id];
373  copy_from_gpu(&data_mgr,
374  &host_hll_buffer[0],
375  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
376  count_distinct_desc.bitmapPaddedSizeBytes(),
377  device_id);
378  }));
379  }
380  for (auto& child : approximate_distinct_device_threads) {
381  child.get();
382  }
383  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
384  auto& result_hll_buffer = host_hll_buffers.front();
385  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
386  for (int device_id = 1; device_id < device_count; ++device_id) {
387  auto& host_hll_buffer = host_hll_buffers[device_id];
388  hll_unify(hll_result,
389  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
390  1 << count_distinct_desc.bitmap_sz_bits);
391  }
392  size_t emitted_keys_count = 0;
393  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
394  emitted_keys_count += emitted_keys_count_device;
395  }
396  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
397  emitted_keys_count);
398 #else
399  UNREACHABLE();
400  return {0, 0};
401 #endif // HAVE_CUDA
402 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
#define UNREACHABLE()
Definition: Logger.h:231
CompositeKeyInfo getCompositeKeyInfo() const
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::vector< InnerOuter > inner_outer_pairs_
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define CHECK(condition)
Definition: Logger.h:187
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:23
#define VLOG(n)
Definition: Logger.h:277
const std::shared_ptr< Analyzer::BinOper > condition_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ calculateCounts()

std::pair< size_t, size_t > OverlapsJoinHashTable::calculateCounts ( size_t  shard_count,
const Fragmenter_Namespace::TableInfo query_info,
const int  device_count,
std::vector< BaselineJoinHashTable::ColumnsForDevice > &  columns_per_device 
)
protected

Definition at line 171 of file OverlapsJoinHashTable.cpp.

References approximateTupleCount(), fetchColumnsForDevice(), Fragmenter_Namespace::TableInfo::fragments, get_entries_per_device(), BaselineJoinHashTable::memory_level_, and only_shards_for_device().

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

175  {
176  for (int device_id = 0; device_id < device_count; ++device_id) {
177  const auto fragments =
178  shard_count
179  ? only_shards_for_device(query_info.fragments, device_id, device_count)
180  : query_info.fragments;
181  const auto columns_for_device = fetchColumnsForDevice(fragments, device_id);
182  columns_per_device.push_back(columns_for_device);
183  }
184 
185  size_t tuple_count;
186  size_t emitted_keys_count;
187  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
188  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
189 
190  return std::make_pair(
191  get_entries_per_device(entry_count, shard_count, device_count, memory_level_),
192  emitted_keys_count);
193 }
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id) override
const Data_Namespace::MemoryLevel memory_level_
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ calculateHashTableSize()

size_t OverlapsJoinHashTable::calculateHashTableSize ( size_t  number_of_dimensions,
size_t  emitted_keys_count,
size_t  entry_count 
) const
protected

Definition at line 195 of file OverlapsJoinHashTable.cpp.

References getKeyComponentWidth().

Referenced by initHashTableOnCpu(), reifyWithLayout(), and yieldCacheInvalidator().

197  {
198  const auto key_component_width = getKeyComponentWidth();
199  const auto key_component_count = number_of_dimensions;
200  const auto entry_size = key_component_count * key_component_width;
201  const auto keys_for_all_rows = emitted_keys_count;
202  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
203  const size_t hash_table_size =
204  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
205  return hash_table_size;
206 }
size_t getKeyComponentWidth() const override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ codegenKey()

llvm::Value * OverlapsJoinHashTable::codegenKey ( const CompilationOptions co)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 695 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CodeGenerator::castArrayPointer(), CHECK, CHECK_EQ, CodeGenerator::codegen(), BaselineJoinHashTable::executor_, logger::FATAL, get_int_type(), getKeyComponentCount(), getKeyComponentWidth(), BaselineJoinHashTable::inner_outer_pairs_, kENCODING_GEOINT, kPOINT, kTINYINT, LL_BUILDER, LL_CONTEXT, LL_FP, LL_INT, and LOG.

Referenced by yieldCacheInvalidator().

695  {
696  const auto key_component_width = getKeyComponentWidth();
697  CHECK(key_component_width == 4 || key_component_width == 8);
698  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
699  llvm::Value* key_buff_lv{nullptr};
700  switch (key_component_width) {
701  case 4:
702  key_buff_lv =
703  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
704  break;
705  case 8:
706  key_buff_lv =
707  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
708  break;
709  default:
710  CHECK(false);
711  }
712 
713  const auto& inner_outer_pair = inner_outer_pairs_[0];
714  const auto outer_col = inner_outer_pair.second;
715  const auto outer_col_ti = outer_col->get_type_info();
716 
717  if (outer_col_ti.is_geometry()) {
718  CodeGenerator code_generator(executor_);
719  // TODO(adb): for points we will use the coords array, but for other geometries we
720  // will need to use the bounding box. For now only support points.
721  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
722  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
723 
724  const auto col_lvs = code_generator.codegen(outer_col, true, co);
725  CHECK_EQ(col_lvs.size(), size_t(1));
726 
727  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
728  CHECK(outer_col_var);
729  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
730  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
731  CHECK(coords_cd);
732 
733  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
734  "array_buff",
735  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
736  {col_lvs.front(), code_generator.posArg(outer_col)});
737  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
738  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
739  const auto arr_ptr =
740  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
741 
742  for (size_t i = 0; i < 2; i++) {
743  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
744 
745  // Note that get_bucket_key_for_range_compressed will need to be specialized for
746  // future compression schemes
747  auto bucket_key =
748  outer_col_ti.get_compression() == kENCODING_GEOINT
749  ? executor_->cgen_state_->emitExternalCall(
750  "get_bucket_key_for_range_compressed",
752  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
753  : executor_->cgen_state_->emitExternalCall(
754  "get_bucket_key_for_range_double",
756  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
757  const auto col_lv = LL_BUILDER.CreateSExt(
758  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
759  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
760  }
761  } else {
762  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
763  }
764  return key_buff_lv;
765 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::vector< double > bucket_sizes_for_dimension_
#define LOG(tag)
Definition: Logger.h:182
#define LL_FP(v)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< InnerOuter > inner_outer_pairs_
#define LL_BUILDER
#define LL_INT(v)
size_t getKeyComponentCount() const override
#define CHECK(condition)
Definition: Logger.h:187
#define LL_CONTEXT
size_t getKeyComponentWidth() const override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ computeBucketSizes()

void OverlapsJoinHashTable::computeBucketSizes ( std::vector< double > &  bucket_sizes_for_dimension,
const JoinColumn join_column,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
private

Definition at line 767 of file OverlapsJoinHashTable.cpp.

References CHECK, CHECK_EQ, CHECK_GT, compute_bucket_sizes(), compute_bucket_sizes_on_device(), copy_from_gpu(), Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getEffectiveMemoryLevel(), overlaps_hashjoin_bucket_threshold_, to_string(), transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), and VLOG.

Referenced by fetchColumnsForDevice().

770  {
771  // No coalesced keys for overlaps joins yet
772  CHECK_EQ(inner_outer_pairs.size(), 1u);
773 
774  const auto col = inner_outer_pairs[0].first;
775  CHECK(col);
776  const auto col_ti = col->get_type_info();
777  CHECK(col_ti.is_array());
778 
779  // Compute the number of dimensions for this overlaps key
780  int num_dims{-1};
781  if (col_ti.is_fixlen_array()) {
782  num_dims = col_ti.get_size() / col_ti.get_elem_type().get_size();
783  num_dims /= 2;
784  } else {
785  CHECK(col_ti.is_varlen_array());
786  num_dims = 2;
787  // TODO(adb): how can we pick the number of dims in the varlen case? e.g.
788  // backwards compatibility with existing bounds cols or generic range joins
789  }
790  CHECK_GT(num_dims, 0);
791  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
792 
793  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
795 
796  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
797  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
798  const int thread_count = cpu_threads();
799  compute_bucket_sizes(local_bucket_sizes,
800  join_column,
802  thread_count);
803  }
804 #ifdef HAVE_CUDA
805  else {
806  // Note that we compute the bucket sizes using only a single GPU
807  const int device_id = 0;
808  auto& data_mgr = executor_->getCatalog()->getDataMgr();
809  ThrustAllocator allocator(&data_mgr, device_id);
810  auto device_bucket_sizes_gpu =
811  transfer_pod_vector_to_gpu(local_bucket_sizes, allocator);
812  auto join_columns_gpu = transfer_object_to_gpu(join_column, allocator);
813 
814  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
815  join_columns_gpu,
817  executor_->blockSize(),
818  executor_->gridSize());
819  copy_from_gpu(&data_mgr,
820  local_bucket_sizes.data(),
821  reinterpret_cast<CUdeviceptr>(device_bucket_sizes_gpu),
822  local_bucket_sizes.size() * sizeof(double),
823  device_id);
824  }
825 #endif
826 
827  size_t ctr = 0;
828  for (auto& bucket_sz : local_bucket_sizes) {
829  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
830  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
831  }
832 
833  return;
834 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
unsigned long long CUdeviceptr
Definition: nocuda.h:27
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const double bucket_size_threshold, const int thread_count)
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::string to_string(char const *&&v)
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
#define CHECK(condition)
Definition: Logger.h:187
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column_for_key, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:23
#define VLOG(n)
Definition: Logger.h:277
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ countBufferOff()

size_t OverlapsJoinHashTable::countBufferOff ( ) const
inlineoverridevirtualnoexcept

Reimplemented from BaselineJoinHashTable.

Definition at line 50 of file OverlapsJoinHashTable.h.

References logger::FATAL, and LOG.

50  {
51  LOG(FATAL) << "Not supported for this layout";
52  return 0;
53  }
#define LOG(tag)
Definition: Logger.h:182

◆ fetchColumnsForDevice()

BaselineJoinHashTable::ColumnsForDevice OverlapsJoinHashTable::fetchColumnsForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 208 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK, JoinColumn::col_buff, computeBucketSizes(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::fetchColumn(), get_column_descriptor_maybe(), get_join_column_type_kind(), BaselineJoinHashTable::getEffectiveMemoryLevel(), BaselineJoinHashTable::inner_outer_pairs_, BaselineJoinHashTable::isBitwiseEq(), and kDOUBLE.

Referenced by calculateCounts(), and yieldCacheInvalidator().

210  {
211  const auto& catalog = *executor_->getCatalog();
212  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
213 
214  std::vector<JoinColumn> join_columns;
215  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
216  std::vector<JoinColumnTypeInfo> join_column_types;
217  std::vector<JoinBucketInfo> join_bucket_info;
218  for (const auto& inner_outer_pair : inner_outer_pairs_) {
219  const auto inner_col = inner_outer_pair.first;
220  const auto inner_cd = get_column_descriptor_maybe(
221  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
222  if (inner_cd && inner_cd->isVirtualCol) {
224  }
225  const auto join_column_info = fetchColumn(
226  inner_col, effective_memory_level, fragments, chunks_owner, device_id);
227  join_columns.emplace_back(
228  JoinColumn{join_column_info.col_buff, join_column_info.num_elems});
229  const auto& ti = inner_col->get_type_info();
230  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
231  0,
232  inline_int_null_value<int64_t>(),
233  isBitwiseEq(),
234  0,
236  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
237 
238  if (bucket_sizes_for_dimension_.empty()) {
240  bucket_sizes_for_dimension_, join_columns.back(), inner_outer_pairs_);
241  }
242  const auto elem_ti = ti.get_elem_type();
243  CHECK(elem_ti.is_fp());
244  join_bucket_info.emplace_back(
245  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
246  }
247  return {join_columns, join_column_types, chunks_owner, join_bucket_info};
248 }
std::vector< double > bucket_sizes_for_dimension_
const int8_t * col_buff
std::vector< InnerOuter > inner_outer_pairs_
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
JoinColumn fetchColumn(const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:187
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const std::vector< InnerOuter > &inner_outer_pairs)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getInstance()

std::shared_ptr< OverlapsJoinHashTable > OverlapsJoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_count,
ColumnCacheMap column_map,
Executor executor 
)
static

Definition at line 30 of file OverlapsJoinHashTable.cpp.

References logger::FATAL, get_entries_per_device(), get_inner_query_info(), BaselineJoinHashTable::getInnerTableId(), Fragmenter_Namespace::TableInfo::getNumTuplesUpperBound(), BaselineJoinHashTable::getShardCountForCondition(), Data_Namespace::GPU_LEVEL, InputTableInfo::info, LOG, and normalize_column_pairs().

Referenced by Executor::buildHashTableForQualifier(), TEST(), and ~OverlapsJoinHashTable().

36  {
37  auto inner_outer_pairs = normalize_column_pairs(
38  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
39  const auto& query_info =
40  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
41  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
42  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
43  throw TooManyHashEntries();
44  }
45  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
47  condition.get(), executor, inner_outer_pairs)
48  : 0;
49  const auto entries_per_device =
50  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
51  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
52  query_infos,
53  memory_level,
54  entries_per_device,
55  column_map,
56  executor,
57  inner_outer_pairs);
58  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
59  try {
60  join_hash_table->reify(device_count);
61  } catch (const HashJoinFail& e) {
62  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
63  "involved in equijoin | ") +
64  e.what());
65  } catch (const ColumnarConversionNotSupported& e) {
66  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
67  e.what());
68  } catch (const std::exception& e) {
69  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
70  << e.what();
71  }
72  return join_hash_table;
73 }
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define LOG(tag)
Definition: Logger.h:182
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
int getInnerTableId() const noexcept override
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getKeyComponentCount()

size_t OverlapsJoinHashTable::getKeyComponentCount ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 408 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_.

Referenced by codegenKey(), and yieldCacheInvalidator().

408  {
409  return bucket_sizes_for_dimension_.size();
410 }
std::vector< double > bucket_sizes_for_dimension_
+ Here is the caller graph for this function:

◆ getKeyComponentWidth()

size_t OverlapsJoinHashTable::getKeyComponentWidth ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 404 of file OverlapsJoinHashTable.cpp.

Referenced by calculateHashTableSize(), codegenKey(), initHashTableOnCpu(), and yieldCacheInvalidator().

404  {
405  return 8;
406 }
+ Here is the caller graph for this function:

◆ initHashTableOnCpu()

int OverlapsJoinHashTable::initHashTableOnCpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 412 of file OverlapsJoinHashTable.cpp.

References calculateHashTableSize(), CHECK, BaselineJoinHashTable::condition_, BaselineJoinHashTable::cpu_hash_table_buff_, cpu_threads(), BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, fill_one_to_many_baseline_hash_table_32(), fill_one_to_many_baseline_hash_table_64(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), getKeyComponentWidth(), init_baseline_hash_join_buff_32(), init_baseline_hash_join_buff_64(), init_hash_join_buff(), BaselineJoinHashTable::initHashTableOnCpuFromCache(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_32(), overlaps_fill_baseline_hash_join_buff_64(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::putHashTableOnCpuToCache(), and VLOG.

Referenced by yieldCacheInvalidator().

416  {
417  const auto composite_key_info = getCompositeKeyInfo();
418  CHECK(!join_columns.empty());
419  CHECK(!join_bucket_info.empty());
420  HashTableCacheKey cache_key{join_columns.front().num_elems,
421  composite_key_info.cache_key_chunks,
422  condition_->get_optype(),
424  initHashTableOnCpuFromCache(cache_key);
425  if (cpu_hash_table_buff_) {
426  return 0;
427  }
429  const auto key_component_width = getKeyComponentWidth();
430  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
431  const auto entry_size = key_component_count * key_component_width;
432  const auto keys_for_all_rows = emitted_keys_count_;
433  const size_t one_to_many_hash_entries = 2 * entry_count_ + keys_for_all_rows;
434  const size_t hash_table_size =
435  calculateHashTableSize(join_bucket_info[0].bucket_sizes_for_dimension.size(),
437  entry_count_);
438 
439  VLOG(1) << "Initializing CPU Overlaps Join Hash Table with " << entry_count_
440  << " hash entries and " << one_to_many_hash_entries
441  << " entries in the one to many buffer";
442  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
443 
444  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
445  int thread_count = cpu_threads();
446  std::vector<std::future<void>> init_cpu_buff_threads;
447  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
448  init_cpu_buff_threads.emplace_back(std::async(
449  std::launch::async,
450  [this, key_component_count, key_component_width, thread_idx, thread_count] {
451  switch (key_component_width) {
452  case 4:
454  entry_count_,
455  key_component_count,
456  false,
457  -1,
458  thread_idx,
459  thread_count);
460  break;
461  case 8:
463  entry_count_,
464  key_component_count,
465  false,
466  -1,
467  thread_idx,
468  thread_count);
469  break;
470  default:
471  CHECK(false);
472  }
473  }));
474  }
475  for (auto& child : init_cpu_buff_threads) {
476  child.get();
477  }
478  std::vector<std::future<int>> fill_cpu_buff_threads;
479  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
480  fill_cpu_buff_threads.emplace_back(std::async(
481  std::launch::async,
482  [this,
483  &join_columns,
484  &join_bucket_info,
485  key_component_count,
486  key_component_width,
487  thread_idx,
488  thread_count] {
489  switch (key_component_width) {
490  case 4: {
491  const auto key_handler = OverlapsKeyHandler(
492  key_component_count,
493  &join_columns[0],
494  join_bucket_info[0].bucket_sizes_for_dimension.data());
496  entry_count_,
497  -1,
498  key_component_count,
499  false,
500  &key_handler,
501  join_columns[0].num_elems,
502  thread_idx,
503  thread_count);
504  }
505  case 8: {
506  const auto key_handler = OverlapsKeyHandler(
507  key_component_count,
508  &join_columns[0],
509  join_bucket_info[0].bucket_sizes_for_dimension.data());
511  entry_count_,
512  -1,
513  key_component_count,
514  false,
515  &key_handler,
516  join_columns[0].num_elems,
517  thread_idx,
518  thread_count);
519  }
520  default:
521  CHECK(false);
522  }
523  return -1;
524  }));
525  }
526  int err = 0;
527  for (auto& child : fill_cpu_buff_threads) {
528  int partial_err = child.get();
529  if (partial_err) {
530  err = partial_err;
531  }
532  }
533  if (err) {
534  cpu_hash_table_buff_.reset();
535  return err;
536  }
537  auto one_to_many_buff =
538  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size);
539  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
540  switch (key_component_width) {
541  case 4: {
542  const auto composite_key_dict =
543  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
545  composite_key_dict,
546  entry_count_,
547  -1,
548  key_component_count,
549  join_columns,
550  join_column_types,
551  join_bucket_info,
552  composite_key_info.sd_inner_proxy_per_key,
553  composite_key_info.sd_outer_proxy_per_key,
554  thread_count);
555  break;
556  }
557  case 8: {
558  const auto composite_key_dict =
559  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
561  composite_key_dict,
562  entry_count_,
563  -1,
564  key_component_count,
565  join_columns,
566  join_column_types,
567  join_bucket_info,
568  composite_key_info.sd_inner_proxy_per_key,
569  composite_key_info.sd_outer_proxy_per_key,
570  thread_count);
571  break;
572  }
573  default:
574  CHECK(false);
575  }
576  if (!err && getInnerTableId() > 0) {
577  putHashTableOnCpuToCache(cache_key);
578  }
579  return err;
580 }
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void putHashTableOnCpuToCache(const HashTableCacheKey &)
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
CompositeKeyInfo getCompositeKeyInfo() const
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int getInnerTableId() const noexcept override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
#define CHECK(condition)
Definition: Logger.h:187
int cpu_threads()
Definition: thread_count.h:23
#define VLOG(n)
Definition: Logger.h:277
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getKeyComponentWidth() const override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initHashTableOnGpu()

int OverlapsJoinHashTable::initHashTableOnGpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout,
const size_t  key_component_width,
const size_t  key_component_count,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 582 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), CHECK, CHECK_EQ, copy_from_gpu(), copy_to_gpu(), BaselineJoinHashTable::entry_count_, BaselineJoinHashTable::executor_, init_baseline_hash_join_buff_on_device_32(), init_baseline_hash_join_buff_on_device_64(), init_hash_join_buff_on_device(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_on_device_64(), overlaps_fill_one_to_many_baseline_hash_table_on_device_64(), transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), and UNREACHABLE.

Referenced by yieldCacheInvalidator().

589  {
590  int err = 0;
591  // TODO(adb): 4 byte keys
592  CHECK_EQ(key_component_width, size_t(8));
594 #ifdef HAVE_CUDA
595  const auto catalog = executor_->getCatalog();
596  auto& data_mgr = catalog->getDataMgr();
597  ThrustAllocator allocator(&data_mgr, device_id);
598  auto dev_err_buff =
599  reinterpret_cast<CUdeviceptr>(allocator.allocateScopedBuffer(sizeof(int)));
600  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
601  switch (key_component_width) {
602  case 4:
604  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
605  entry_count_,
606  key_component_count,
607  false,
608  -1,
609  executor_->blockSize(),
610  executor_->gridSize());
611  break;
612  case 8:
614  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
615  entry_count_,
616  key_component_count,
617  false,
618  -1,
619  executor_->blockSize(),
620  executor_->gridSize());
621  break;
622  default:
623  CHECK(false);
624  }
625  auto join_columns_gpu = transfer_pod_vector_to_gpu(join_columns, allocator);
626  auto hash_buff =
627  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
628  CHECK_EQ(join_columns.size(), 1u);
629  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
630  auto bucket_sizes_gpu =
631  transfer_pod_vector_to_gpu(bucket_sizes_for_dimension, allocator);
632  const auto key_handler = OverlapsKeyHandler(
633  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
634  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
635  switch (key_component_width) {
636  case 8: {
638  hash_buff,
639  entry_count_,
640  -1,
641  key_component_count,
642  false,
643  reinterpret_cast<int*>(dev_err_buff),
644  key_handler_gpu,
645  join_columns.front().num_elems,
646  executor_->blockSize(),
647  executor_->gridSize());
648  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
649  break;
650  }
651  default:
652  UNREACHABLE();
653  }
654  if (err) {
655  return err;
656  }
657  const auto entry_size = key_component_count * key_component_width;
658  auto one_to_many_buff = reinterpret_cast<int32_t*>(
659  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
660  switch (key_component_width) {
661  case 8: {
662  const auto composite_key_dict =
663  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
664  init_hash_join_buff_on_device(one_to_many_buff,
665  entry_count_,
666  -1,
667  executor_->blockSize(),
668  executor_->gridSize());
670  one_to_many_buff,
671  composite_key_dict,
672  entry_count_,
673  -1,
674  key_handler_gpu,
675  join_columns.front().num_elems,
676  executor_->blockSize(),
677  executor_->gridSize());
678  break;
679  }
680  default:
681  UNREACHABLE();
682  }
683 #else
684  UNREACHABLE();
685 #endif
686  return err;
687 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:231
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
#define CHECK(condition)
Definition: Logger.h:187
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ payloadBufferOff()

size_t OverlapsJoinHashTable::payloadBufferOff ( ) const
inlineoverridevirtualnoexcept

Reimplemented from BaselineJoinHashTable.

Definition at line 55 of file OverlapsJoinHashTable.h.

References logger::FATAL, and LOG.

55  {
56  LOG(FATAL) << "Not supported for this layout";
57  return 0;
58  }
#define LOG(tag)
Definition: Logger.h:182

◆ reifyWithLayout()

void OverlapsJoinHashTable::reifyWithLayout ( const int  device_count,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 75 of file OverlapsJoinHashTable.cpp.

References auto_tuner_cache_, auto_tuner_cache_mutex_, bucket_sizes_for_dimension_, calculateCounts(), calculateHashTableSize(), CHECK, BaselineJoinHashTable::condition_, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, g_overlaps_max_table_size_bytes, get_inner_query_info(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), InputTableInfo::info, BaselineJoinHashTable::layout_, JoinHashTableInterface::OneToMany, only_shards_for_device(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::query_infos_, BaselineJoinHashTable::reifyForDevice(), BaselineJoinHashTable::shardCount(), and VLOG.

Referenced by yieldCacheInvalidator().

77  {
79  layout_ = layout;
80  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
81  if (query_info.fragments.empty()) {
82  return;
83  }
84  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
85  const auto shard_count = shardCount();
86 
87  // Prepare to calculate the size of the hash table.
89  calculateCounts(shard_count,
90  query_info,
91  device_count,
92  columns_per_device); // called only to populate columns_per_device
93  const auto composite_key_info = getCompositeKeyInfo();
94  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
95  composite_key_info.cache_key_chunks,
96  condition_->get_optype()};
97  columns_per_device.clear();
99 
100  // Auto-tuner: Pre-calculate some possible hash table sizes.
101  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
102  auto atc = auto_tuner_cache_.find(cache_key);
103  if (atc != auto_tuner_cache_.end()) {
105  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
107  } else {
108  VLOG(1) << "Auto tuning for the overlaps hash table size:";
109  const double min_threshold{0.00001};
110  const double max_threshold{0.1};
111  double good_threshold{max_threshold};
112  for (double threshold = max_threshold; threshold >= min_threshold;
113  threshold /= 10.0) {
115  size_t entry_count;
116  size_t emitted_keys_count;
117  std::tie(entry_count, emitted_keys_count) =
118  calculateCounts(shard_count, query_info, device_count, columns_per_device);
119  size_t hash_table_size = calculateHashTableSize(
120  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
121  columns_per_device.clear();
123  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
124  << " giving: entry count " << entry_count << " hash table size "
125  << hash_table_size;
126  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
127  good_threshold = overlaps_hashjoin_bucket_threshold_;
128  } else {
129  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
130  break;
131  }
132  }
133  overlaps_hashjoin_bucket_threshold_ = good_threshold;
135  }
136 
137  // Calculate the final size of the hash table.
138  VLOG(1) << "Accepted bin threshold of " << std::fixed
140  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
141  // from entries_per_device.
142  std::tie(entry_count_, emitted_keys_count_) =
143  calculateCounts(shard_count, query_info, device_count, columns_per_device);
144  size_t hash_table_size = calculateHashTableSize(
146  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
147  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
148  << entry_count_ << " hash table size " << hash_table_size;
149 
150  std::vector<std::future<void>> init_threads;
151  for (int device_id = 0; device_id < device_count; ++device_id) {
152  const auto fragments =
153  shard_count
154  ? only_shards_for_device(query_info.fragments, device_id, device_count)
155  : query_info.fragments;
156  init_threads.push_back(std::async(std::launch::async,
158  this,
159  columns_per_device[device_id],
160  layout,
161  device_id));
162  }
163  for (auto& init_thread : init_threads) {
164  init_thread.wait();
165  }
166  for (auto& init_thread : init_threads) {
167  init_thread.get();
168  }
169 }
std::vector< double > bucket_sizes_for_dimension_
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static std::mutex auto_tuner_cache_mutex_
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, const int device_count, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
JoinHashTableInterface::HashType layout_
CompositeKeyInfo getCompositeKeyInfo() const
static std::map< HashTableCacheKey, double > auto_tuner_cache_
const std::vector< InputTableInfo > & query_infos_
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
int getInnerTableId() const noexcept override
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:87
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
#define CHECK(condition)
Definition: Logger.h:187
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:277
const std::shared_ptr< Analyzer::BinOper > condition_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ yieldCacheInvalidator()

static auto OverlapsJoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 60 of file OverlapsJoinHashTable.h.

References approximateTupleCount(), auto_tuner_cache_, auto_tuner_cache_mutex_, calculateCounts(), calculateHashTableSize(), codegenKey(), fetchColumnsForDevice(), getKeyComponentCount(), getKeyComponentWidth(), initHashTableOnCpu(), initHashTableOnGpu(), and reifyWithLayout().

60  {
61  return []() -> void {
62  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
63  auto_tuner_cache_.clear();
64  };
65  }
static std::mutex auto_tuner_cache_mutex_
static std::map< HashTableCacheKey, double > auto_tuner_cache_
+ Here is the call graph for this function:

Member Data Documentation

◆ auto_tuner_cache_

std::map< OverlapsJoinHashTable::HashTableCacheKey, double > OverlapsJoinHashTable::auto_tuner_cache_
staticprotected

Definition at line 107 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

◆ auto_tuner_cache_mutex_

std::mutex OverlapsJoinHashTable::auto_tuner_cache_mutex_
staticprotected

Definition at line 108 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

◆ bucket_sizes_for_dimension_

std::vector<double> OverlapsJoinHashTable::bucket_sizes_for_dimension_
private

◆ overlaps_hashjoin_bucket_threshold_

double OverlapsJoinHashTable::overlaps_hashjoin_bucket_threshold_
private

The documentation for this class was generated from the following files: