OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable Class Reference

#include <OverlapsJoinHashTable.h>

+ Inheritance diagram for OverlapsJoinHashTable:
+ Collaboration diagram for OverlapsJoinHashTable:

Public Member Functions

 OverlapsJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, JoinHashTableInterface::HashType hash_layout_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
 
 ~OverlapsJoinHashTable () override
 
- Public Member Functions inherited from BaselineJoinHashTable
int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
size_t getJoinHashBufferSize (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
std::string toString (const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
 
std::set
< DecodedJoinHashBufferEntry
toSet (const ExecutorDeviceType device_type, const int device_id) const override
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexceptoverride
 
int getInnerTableRteIdx () const noexceptoverride
 
JoinHashTableInterface::HashType getHashType () const noexceptoverride
 
Data_Namespace::MemoryLevel getMemoryLevel () const noexceptoverride
 
int getDeviceCount () const noexceptoverride
 
size_t offsetBufferOff () const noexceptoverride
 
size_t countBufferOff () const noexceptoverride
 
size_t payloadBufferOff () const noexceptoverride
 
virtual ~BaselineJoinHashTable ()
 
- Public Member Functions inherited from JoinHashTableInterface
virtual std::string toStringFlat64 (const ExecutorDeviceType device_type, const int device_id) const
 
virtual std::string toStringFlat32 (const ExecutorDeviceType device_type, const int device_id) const
 
JoinColumn fetchJoinColumn (const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
 

Static Public Member Functions

static std::shared_ptr
< OverlapsJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
- Static Public Member Functions inherited from BaselineJoinHashTable
static std::shared_ptr
< BaselineJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static size_t getShardCountForCondition (const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
static const std::shared_ptr
< std::vector< int8_t > > & 
getCachedHashTable (size_t idx)
 
static size_t getEntryCntCachedHashTable (size_t idx)
 
static uint64_t getNumberOfCachedHashTables ()
 
- Static Public Member Functions inherited from JoinHashTableInterface
static std::string getHashTypeString (HashType ht) noexcept
 
static DecodedJoinHashBufferSet toSet (size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
 Decode hash table into a std::set for easy inspection and validation. More...
 
static std::string toString (const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
 Decode hash table into a human-readable string. More...
 
static std::shared_ptr
< JoinHashTableInterface
getInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr
< JoinHashTableInterface
getSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static std::shared_ptr
< JoinHashTableInterface
getSyntheticInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 

Protected Member Functions

void reifyWithLayout (const JoinHashTableInterface::HashType layout) override
 
std::pair< size_t, size_t > calculateCounts (size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
 
size_t calculateHashTableSize (size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
 
ColumnsForDevice fetchColumnsForDevice (const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner) override
 
std::vector< JoinBucketInfocomputeBucketInfo (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const int device_id)
 
std::pair< size_t, size_t > approximateTupleCount (const std::vector< ColumnsForDevice > &) const override
 
size_t getKeyComponentWidth () const override
 
size_t getKeyComponentCount () const override
 
int initHashTableOnCpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout) override
 
int initHashTableOnGpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
- Protected Member Functions inherited from BaselineJoinHashTable
 BaselineJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
 
size_t shardCount () const
 
Data_Namespace::MemoryLevel getEffectiveMemoryLevel (const std::vector< InnerOuter > &inner_outer_pairs) const
 
CompositeKeyInfo getCompositeKeyInfo () const
 
void reify ()
 
void reifyForDevice (const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
int initHashTableForDevice (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
llvm::Value * hashPtr (const size_t index)
 
void initHashTableOnCpuFromCache (const HashTableCacheKey &)
 
void putHashTableOnCpuToCache (const HashTableCacheKey &)
 
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache (const HashTableCacheKey &) const
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
bool layoutRequiresAdditionalBuffers (JoinHashTableInterface::HashType layout) const noexceptoverride
 
const HashTableCacheValuefindHashTableOnCpuInCache (const HashTableCacheKey &)
 

Static Protected Attributes

static std::map
< HashTableCacheKey, double > 
auto_tuner_cache_
 
static std::mutex auto_tuner_cache_mutex_
 
- Static Protected Attributes inherited from BaselineJoinHashTable
static std::vector< std::pair
< HashTableCacheKey,
HashTableCacheValue > > 
hash_table_cache_
 
static std::mutex hash_table_cache_mutex_
 
static const int ERR_FAILED_TO_FETCH_COLUMN {-3}
 
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN {-4}
 

Private Member Functions

void computeBucketSizes (std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs)
 
llvm::Value * codegenKey (const CompilationOptions &) override
 
std::vector< llvm::Value * > codegenManyKey (const CompilationOptions &)
 

Private Attributes

std::vector< double > bucket_sizes_for_dimension_
 
double overlaps_hashjoin_bucket_threshold_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType : int { HashType::OneToOne, HashType::OneToMany, HashType::ManyToMany }
 
- Static Protected Member Functions inherited from BaselineJoinHashTable
static int getInnerTableId (const std::vector< InnerOuter > &inner_outer_pairs)
 
- Protected Attributes inherited from BaselineJoinHashTable
const std::shared_ptr
< Analyzer::BinOper
condition_
 
const std::vector
< InputTableInfo > & 
query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
JoinHashTableInterface::HashType layout_
 
size_t entry_count_
 
size_t emitted_keys_count_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
std::shared_ptr< std::vector
< int8_t > > 
cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
std::vector< InnerOuterinner_outer_pairs_
 
const Catalog_Namespace::Catalogcatalog_
 
const int device_count_
 

Detailed Description

Definition at line 22 of file OverlapsJoinHashTable.h.

Constructor & Destructor Documentation

OverlapsJoinHashTable::OverlapsJoinHashTable ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
JoinHashTableInterface::HashType  hash_layout_type,
const size_t  entry_count,
ColumnCacheMap column_cache,
Executor executor,
const std::vector< InnerOuter > &  inner_outer_pairs,
const int  device_count 
)
inline

Definition at line 24 of file OverlapsJoinHashTable.h.

33  : BaselineJoinHashTable(condition,
34  query_infos,
35  memory_level,
36  hash_layout_type,
37  entry_count,
38  column_cache,
39  executor,
40  inner_outer_pairs,
41  device_count) {}
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
OverlapsJoinHashTable::~OverlapsJoinHashTable ( )
inlineoverride

Definition at line 43 of file OverlapsJoinHashTable.h.

43 {}

Member Function Documentation

std::pair< size_t, size_t > OverlapsJoinHashTable::approximateTupleCount ( const std::vector< ColumnsForDevice > &  columns_per_device) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 343 of file OverlapsJoinHashTable.cpp.

References CudaAllocator::alloc(), approximate_distinct_tuples_on_device_overlaps(), approximate_distinct_tuples_overlaps(), Bitmap, CHECK(), CHECK_EQ, CHECK_GT, BaselineJoinHashTable::condition_, copy_from_gpu(), copy_to_gpu(), CPU, Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::device_count_, BaselineJoinHashTable::executor_, BaselineJoinHashTable::getApproximateTupleCountFromCache(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getEffectiveMemoryLevel(), GPU, Data_Namespace::GPU_LEVEL, hll_size(), hll_unify(), BaselineJoinHashTable::inner_outer_pairs_, overlaps_hashjoin_bucket_threshold_, transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), UNREACHABLE, and VLOG.

Referenced by calculateCounts().

344  {
345  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
346  CountDistinctDescriptor count_distinct_desc{
348  0,
349  11,
350  true,
351  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
354  1};
355  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
356 
357  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
358  // Number of keys must match dimension of buckets
359  CHECK_EQ(columns_per_device.front().join_columns.size(),
360  columns_per_device.front().join_buckets.size());
361  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
362  const auto composite_key_info = getCompositeKeyInfo();
363  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
364  composite_key_info.cache_key_chunks,
365  condition_->get_optype(),
367  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
368  if (cached_count_info.first >= 0) {
369  VLOG(1) << "Using a cached tuple count: " << cached_count_info.first
370  << ", emitted keys count: " << cached_count_info.second;
371  return std::make_pair(cached_count_info.first, cached_count_info.second);
372  }
373  int thread_count = cpu_threads();
374  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
375  auto hll_result = &hll_buffer_all_cpus[0];
376 
377  std::vector<int32_t> num_keys_for_row;
378  // TODO(adb): support multi-column overlaps join
379  CHECK_EQ(columns_per_device.size(), 1u);
380  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
381 
383  num_keys_for_row,
384  count_distinct_desc.bitmap_sz_bits,
385  padded_size_bytes,
386  columns_per_device.front().join_columns,
387  columns_per_device.front().join_column_types,
388  columns_per_device.front().join_buckets,
389  thread_count);
390  for (int i = 1; i < thread_count; ++i) {
391  hll_unify(hll_result,
392  hll_result + i * padded_size_bytes,
393  1 << count_distinct_desc.bitmap_sz_bits);
394  }
395  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
396  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
397  }
398 #ifdef HAVE_CUDA
399  auto& data_mgr = executor_->getCatalog()->getDataMgr();
400  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
401  for (auto& host_hll_buffer : host_hll_buffers) {
402  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
403  }
404  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
405  std::vector<std::future<void>> approximate_distinct_device_threads;
406  for (int device_id = 0; device_id < device_count_; ++device_id) {
407  approximate_distinct_device_threads.emplace_back(std::async(
408  std::launch::async,
409  [device_id,
410  &columns_per_device,
411  &count_distinct_desc,
412  &data_mgr,
413  &host_hll_buffers,
414  &emitted_keys_count_device_threads,
415  this] {
416  CudaAllocator allocator(&data_mgr, device_id);
417  auto device_hll_buffer =
418  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
419  data_mgr.getCudaMgr()->zeroDeviceMem(
420  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
421  const auto& columns_for_device = columns_per_device[device_id];
422  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
423  columns_for_device.join_columns, allocator);
424 
425  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
426  const auto& bucket_sizes_for_dimension =
427  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
428  auto bucket_sizes_gpu =
429  allocator.alloc(bucket_sizes_for_dimension.size() * sizeof(double));
430  copy_to_gpu(&data_mgr,
431  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
432  bucket_sizes_for_dimension.data(),
433  bucket_sizes_for_dimension.size() * sizeof(double),
434  device_id);
435  const size_t row_counts_buffer_sz =
436  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
437  auto row_counts_buffer = allocator.alloc(row_counts_buffer_sz);
438  data_mgr.getCudaMgr()->zeroDeviceMem(
439  row_counts_buffer, row_counts_buffer_sz, device_id);
440  const auto key_handler =
441  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
442  join_columns_gpu,
443  reinterpret_cast<double*>(bucket_sizes_gpu));
444  const auto key_handler_gpu =
445  transfer_flat_object_to_gpu(key_handler, allocator);
447  reinterpret_cast<uint8_t*>(device_hll_buffer),
448  count_distinct_desc.bitmap_sz_bits,
449  reinterpret_cast<int32_t*>(row_counts_buffer),
450  key_handler_gpu,
451  columns_for_device.join_columns[0].num_elems,
452  executor_->blockSize(),
453  executor_->gridSize());
454 
455  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
456  copy_from_gpu(&data_mgr,
457  &host_emitted_keys_count,
458  reinterpret_cast<CUdeviceptr>(
459  row_counts_buffer +
460  (columns_per_device.front().join_columns[0].num_elems - 1) *
461  sizeof(int32_t)),
462  sizeof(int32_t),
463  device_id);
464 
465  auto& host_hll_buffer = host_hll_buffers[device_id];
466  copy_from_gpu(&data_mgr,
467  &host_hll_buffer[0],
468  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
469  count_distinct_desc.bitmapPaddedSizeBytes(),
470  device_id);
471  }));
472  }
473  for (auto& child : approximate_distinct_device_threads) {
474  child.get();
475  }
476  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
477  auto& result_hll_buffer = host_hll_buffers.front();
478  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
479  for (int device_id = 1; device_id < device_count_; ++device_id) {
480  auto& host_hll_buffer = host_hll_buffers[device_id];
481  hll_unify(hll_result,
482  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
483  1 << count_distinct_desc.bitmap_sz_bits);
484  }
485  size_t emitted_keys_count = 0;
486  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
487  emitted_keys_count += emitted_keys_count_device;
488  }
489  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
490  emitted_keys_count);
491 #else
492  UNREACHABLE();
493  return {0, 0};
494 #endif // HAVE_CUDA
495 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
#define UNREACHABLE()
Definition: Logger.h:241
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
CompositeKeyInfo getCompositeKeyInfo() const
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > OverlapsJoinHashTable::calculateCounts ( size_t  shard_count,
const Fragmenter_Namespace::TableInfo query_info,
std::vector< BaselineJoinHashTable::ColumnsForDevice > &  columns_per_device 
)
protected

Definition at line 241 of file OverlapsJoinHashTable.cpp.

References approximateTupleCount(), CHECK_EQ, computeBucketInfo(), BaselineJoinHashTable::device_count_, get_entries_per_device(), and BaselineJoinHashTable::memory_level_.

Referenced by reifyWithLayout().

244  {
245  // re-compute bucket counts per device based on global bucket size
246  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
247  for (int device_id = 0; device_id < device_count_; ++device_id) {
248  auto& columns_for_device = columns_per_device[device_id];
249  columns_for_device.join_buckets = computeBucketInfo(
250  columns_for_device.join_columns, columns_for_device.join_column_types, device_id);
251  }
252  size_t tuple_count;
253  size_t emitted_keys_count;
254  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
255  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
256 
257  return std::make_pair(
258  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
259  emitted_keys_count);
260 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
std::vector< JoinBucketInfo > computeBucketInfo(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const int device_id)
const Data_Namespace::MemoryLevel memory_level_
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::calculateHashTableSize ( size_t  number_of_dimensions,
size_t  emitted_keys_count,
size_t  entry_count 
) const
protected

Definition at line 262 of file OverlapsJoinHashTable.cpp.

References getKeyComponentWidth().

Referenced by initHashTableOnCpu(), and reifyWithLayout().

264  {
265  const auto key_component_width = getKeyComponentWidth();
266  const auto key_component_count = number_of_dimensions;
267  const auto entry_size = key_component_count * key_component_width;
268  const auto keys_for_all_rows = emitted_keys_count;
269  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
270  const size_t hash_table_size =
271  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
272  return hash_table_size;
273 }
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

llvm::Value * OverlapsJoinHashTable::codegenKey ( const CompilationOptions co)
overrideprivatevirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 796 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), CHECK_EQ, CodeGenerator::codegen(), BaselineJoinHashTable::executor_, get_int_type(), getKeyComponentCount(), getKeyComponentWidth(), BaselineJoinHashTable::inner_outer_pairs_, kENCODING_GEOINT, kPOINT, kTINYINT, LL_BUILDER, LL_CONTEXT, LL_FP, LL_INT, and CodeGenerator::posArg().

796  {
797  const auto key_component_width = getKeyComponentWidth();
798  CHECK(key_component_width == 4 || key_component_width == 8);
799  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
800  llvm::Value* key_buff_lv{nullptr};
801  switch (key_component_width) {
802  case 4:
803  key_buff_lv =
804  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
805  break;
806  case 8:
807  key_buff_lv =
808  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
809  break;
810  default:
811  CHECK(false);
812  }
813 
814  const auto& inner_outer_pair = inner_outer_pairs_[0];
815  const auto outer_col = inner_outer_pair.second;
816  const auto outer_col_ti = outer_col->get_type_info();
817 
818  if (outer_col_ti.is_geometry()) {
819  CodeGenerator code_generator(executor_);
820  // TODO(adb): for points we will use the coords array, but for other geometries we
821  // will need to use the bounding box. For now only support points.
822  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
823  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
824 
825  const auto col_lvs = code_generator.codegen(outer_col, true, co);
826  CHECK_EQ(col_lvs.size(), size_t(1));
827 
828  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
829  CHECK(outer_col_var);
830  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
831  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
832  CHECK(coords_cd);
833 
834  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
835  "array_buff",
836  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
837  {col_lvs.front(), code_generator.posArg(outer_col)});
838  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
839  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
840  const auto arr_ptr =
841  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
842 
843  for (size_t i = 0; i < 2; i++) {
844  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
845 
846  // Note that get_bucket_key_for_range_compressed will need to be specialized for
847  // future compression schemes
848  auto bucket_key =
849  outer_col_ti.get_compression() == kENCODING_GEOINT
850  ? executor_->cgen_state_->emitExternalCall(
851  "get_bucket_key_for_range_compressed",
853  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
854  : executor_->cgen_state_->emitExternalCall(
855  "get_bucket_key_for_range_double",
857  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
858  const auto col_lv = LL_BUILDER.CreateSExt(
859  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
860  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
861  }
862  } else {
863  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
864  }
865  return key_buff_lv;
866 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
#define LOG(tag)
Definition: Logger.h:188
#define LL_FP(v)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
#define LL_BUILDER
#define LL_INT(v)
size_t getKeyComponentCount() const override
#define LL_CONTEXT
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

std::vector< llvm::Value * > OverlapsJoinHashTable::codegenManyKey ( const CompilationOptions co)
private

Definition at line 868 of file OverlapsJoinHashTable.cpp.

References CHECK(), CHECK_EQ, CodeGenerator::codegen(), BaselineJoinHashTable::executor_, getKeyComponentWidth(), BaselineJoinHashTable::inner_outer_pairs_, BaselineJoinHashTable::layout_, JoinHashTableInterface::ManyToMany, CodeGenerator::posArg(), and VLOG.

Referenced by codegenMatchingSet().

869  {
870  const auto key_component_width = getKeyComponentWidth();
871  CHECK(key_component_width == 4 || key_component_width == 8);
873 
874  VLOG(1) << "Performing codgen for ManyToMany";
875  const auto& inner_outer_pair = inner_outer_pairs_[0];
876  const auto outer_col = inner_outer_pair.second;
877 
878  CodeGenerator code_generator(executor_);
879  const auto col_lvs = code_generator.codegen(outer_col, true, co);
880  CHECK_EQ(col_lvs.size(), size_t(1));
881 
882  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
883  CHECK(outer_col_var);
884  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
885  outer_col_var->get_table_id(), outer_col_var->get_column_id());
886  CHECK(coords_cd);
887 
888  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
889  "array_buff",
890  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
891  {col_lvs.front(), code_generator.posArg(outer_col)});
892 
893  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
894  // const auto arr_ptr =
895  // code_generator.castArrayPointer(array_ptr,
896  // coords_cd->columnType.get_elem_type());
897  array_ptr->setName("array_ptr");
898 
899  auto num_keys_lv =
900  executor_->cgen_state_->emitExternalCall("get_num_buckets_for_bounds",
902  {array_ptr,
903  LL_INT(0),
906  num_keys_lv->setName("num_keys_lv");
907 
908  return {num_keys_lv, array_ptr};
909 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
#define LL_FP(v)
JoinHashTableInterface::HashType layout_
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
#define LL_INT(v)
#define LL_CONTEXT
#define VLOG(n)
Definition: Logger.h:291
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

HashJoinMatchingSet OverlapsJoinHashTable::codegenMatchingSet ( const CompilationOptions co,
const size_t  index 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 911 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), JoinHashTable::codegenHashTableLoad(), codegenManyKey(), BaselineJoinHashTable::codegenMatchingSet(), BaselineJoinHashTable::entry_count_, BaselineJoinHashTable::executor_, get_int_array_type(), BaselineJoinHashTable::getHashType(), getKeyComponentCount(), getKeyComponentWidth(), LL_BUILDER, LL_CONTEXT, LL_FP, LL_INT, JoinHashTableInterface::ManyToMany, BaselineJoinHashTable::offsetBufferOff(), UNREACHABLE, and VLOG.

913  {
915  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
916  const auto key_component_width = getKeyComponentWidth();
917  CHECK(key_component_width == 4 || key_component_width == 8);
918  auto many_to_many_args = codegenManyKey(co);
919  auto hash_ptr = JoinHashTable::codegenHashTableLoad(index, executor_);
920  const auto composite_dict_ptr_type =
921  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
922  const auto composite_key_dict =
923  hash_ptr->getType()->isPointerTy()
924  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
925  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
926  const auto key_component_count = getKeyComponentCount();
927 
928  auto one_to_many_ptr = hash_ptr;
929 
930  if (one_to_many_ptr->getType()->isPointerTy()) {
931  one_to_many_ptr =
932  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
933  } else {
934  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
935  }
936 
937  const auto composite_key_dict_size = offsetBufferOff();
938  one_to_many_ptr =
939  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
940 
941  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
942  // this is likely the maximum value we can do that is safe to use across
943  // all supported GPU architectures.
944  const int max_array_size = 200;
945  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
946  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
947  out_arr_lv->setName("out_arr");
948 
949  const auto casted_out_arr_lv =
950  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
951 
952  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
953 
954  auto rowid_ptr_i32 =
955  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
956 
957  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
958  "get_candidate_rows",
959  llvm::Type::getInt64Ty(LL_CONTEXT),
960  {
961  rowid_ptr_i32,
962  LL_INT(max_array_size),
963  many_to_many_args[1],
964  LL_INT(0),
967  many_to_many_args[0],
968  LL_INT(key_component_count), // key_component_count
969  composite_key_dict, // ptr to hash table
970  LL_INT(entry_count_), // entry_count
971  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
972  LL_INT(entry_count_ * sizeof(int32_t)) // sub_buff_size
973  });
974 
975  const auto slot_lv = LL_INT(int64_t(0));
976 
977  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
978  } else {
979  VLOG(1) << "Building codegenMatchingSet for Baseline";
981  }
982  UNREACHABLE();
983  return HashJoinMatchingSet{};
984 }
size_t offsetBufferOff() const noexceptoverride
std::vector< double > bucket_sizes_for_dimension_
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
#define LL_FP(v)
#define UNREACHABLE()
Definition: Logger.h:241
CHECK(cgen_state)
JoinHashTableInterface::HashType getHashType() const noexceptoverride
#define LL_BUILDER
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
#define LL_INT(v)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
size_t getKeyComponentCount() const override
#define LL_CONTEXT
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
#define VLOG(n)
Definition: Logger.h:291
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

std::vector< JoinBucketInfo > OverlapsJoinHashTable::computeBucketInfo ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const int  device_id 
)
protected

Definition at line 317 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), CHECK_EQ, computeBucketSizes(), BaselineJoinHashTable::inner_outer_pairs_, and kDOUBLE.

Referenced by calculateCounts(), and fetchColumnsForDevice().

320  {
321  std::vector<JoinBucketInfo> join_bucket_info;
322  CHECK_EQ(inner_outer_pairs_.size(), join_columns.size());
323  CHECK_EQ(join_columns.size(), join_column_types.size());
324  for (size_t i = 0; i < join_columns.size(); i++) {
325  const auto& inner_outer_pair = inner_outer_pairs_[i];
326  const auto inner_col = inner_outer_pair.first;
327  const auto& ti = inner_col->get_type_info();
328  const auto elem_ti = ti.get_elem_type();
329  CHECK(elem_ti.is_fp());
330 
331  if (bucket_sizes_for_dimension_.empty()) {
333  join_columns[i],
334  join_column_types[i],
336  }
337  join_bucket_info.emplace_back(
338  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
339  }
340  return join_bucket_info;
341 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void OverlapsJoinHashTable::computeBucketSizes ( std::vector< double > &  bucket_sizes_for_dimension,
const JoinColumn join_column,
const JoinColumnTypeInfo join_column_type,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
private

Definition at line 986 of file OverlapsJoinHashTable.cpp.

References CHECK(), CHECK_EQ, compute_bucket_sizes(), compute_bucket_sizes_on_device(), CudaAllocator::copyFromDevice(), Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getEffectiveMemoryLevel(), overlaps_hashjoin_bucket_threshold_, to_string(), transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), and VLOG.

Referenced by computeBucketInfo().

990  {
991  // No coalesced keys for overlaps joins yet
992  CHECK_EQ(inner_outer_pairs.size(), 1u);
993 
994  const auto col = inner_outer_pairs[0].first;
995  CHECK(col);
996  const auto col_ti = col->get_type_info();
997  CHECK(col_ti.is_array());
998 
999  // TODO: Compute the number of dimensions for this overlaps key
1000  const int num_dims = 2;
1001  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
1002 
1003  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
1005 
1006  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
1007  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1008  const int thread_count = cpu_threads();
1009  compute_bucket_sizes(local_bucket_sizes,
1010  join_column,
1011  join_column_type,
1013  thread_count);
1014  }
1015 #ifdef HAVE_CUDA
1016  else {
1017  // Note that we compute the bucket sizes using only a single GPU
1018  const int device_id = 0;
1019  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1020  CudaAllocator allocator(&data_mgr, device_id);
1021  auto device_bucket_sizes_gpu =
1022  transfer_vector_of_flat_objects_to_gpu(local_bucket_sizes, allocator);
1023  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
1024  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
1025 
1026  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
1027  join_column_gpu,
1028  join_column_type_gpu,
1030  executor_->blockSize(),
1031  executor_->gridSize());
1032  allocator.copyFromDevice(reinterpret_cast<int8_t*>(local_bucket_sizes.data()),
1033  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
1034  local_bucket_sizes.size() * sizeof(double));
1035  }
1036 #endif
1037 
1038  size_t ctr = 0;
1039  for (auto& bucket_sz : local_bucket_sizes) {
1040  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
1041  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
1042  }
1043 
1044  return;
1045 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
std::string to_string(char const *&&v)
CHECK(cgen_state)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

BaselineJoinHashTable::ColumnsForDevice OverlapsJoinHashTable::fetchColumnsForDevice ( const std::vector< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id,
DeviceAllocator dev_buff_owner 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 275 of file OverlapsJoinHashTable.cpp.

References CHECK(), BaselineJoinHashTable::column_cache_, computeBucketInfo(), BaselineJoinHashTable::executor_, JoinHashTableInterface::fetchJoinColumn(), get_column_descriptor_maybe(), get_join_column_type_kind(), BaselineJoinHashTable::getEffectiveMemoryLevel(), BaselineJoinHashTable::inner_outer_pairs_, and BaselineJoinHashTable::isBitwiseEq().

Referenced by reifyWithLayout().

278  {
279  const auto& catalog = *executor_->getCatalog();
280  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
281 
282  std::vector<JoinColumn> join_columns;
283  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
284  std::vector<JoinColumnTypeInfo> join_column_types;
285  std::vector<std::shared_ptr<void>> malloc_owner;
286  for (const auto& inner_outer_pair : inner_outer_pairs_) {
287  const auto inner_col = inner_outer_pair.first;
288  const auto inner_cd = get_column_descriptor_maybe(
289  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
290  if (inner_cd && inner_cd->isVirtualCol) {
292  }
293  join_columns.emplace_back(fetchJoinColumn(inner_col,
294  fragments,
295  effective_memory_level,
296  device_id,
297  chunks_owner,
298  dev_buff_owner,
299  malloc_owner,
300  executor_,
301  &column_cache_));
302  const auto& ti = inner_col->get_type_info();
303  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
304  0,
305  0,
306  inline_int_null_value<int64_t>(),
307  isBitwiseEq(),
308  0,
310  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
311  }
312  // compute initial bucket info
313  auto join_bucket_info = computeBucketInfo(join_columns, join_column_types, device_id);
314  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
315 }
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > computeBucketInfo(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const int device_id)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:149
ColumnCacheMap & column_cache_
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< OverlapsJoinHashTable > OverlapsJoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from an in-flight SQL query's parse tree etc.

Definition at line 31 of file OverlapsJoinHashTable.cpp.

References CHECK_EQ, logger::FATAL, get_entries_per_device(), get_inner_query_info(), JoinHashTableInterface::getHashTypeString(), BaselineJoinHashTable::getInnerTableId(), Fragmenter_Namespace::TableInfo::getNumTuplesUpperBound(), BaselineJoinHashTable::getShardCountForCondition(), Data_Namespace::GPU_LEVEL, InputTableInfo::info, LOG, JoinHashTableInterface::ManyToMany, normalize_column_pairs(), JoinHashTableInterface::OneToMany, VLOG, and VLOGGING.

Referenced by JoinHashTableInterface::getInstance().

37  {
38  decltype(std::chrono::steady_clock::now()) ts1, ts2;
39  auto inner_outer_pairs = normalize_column_pairs(
40  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
41 
42  const auto getHashTableType = [](const std::shared_ptr<Analyzer::BinOper> condition,
43  const std::vector<InnerOuter>& inner_outer_pairs)
46  if (condition->is_overlaps_oper()) {
47  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
48  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
49  inner_outer_pairs[0].second->get_type_info().is_array()) {
51  }
52  }
53  return layout;
54  };
55 
56  auto layout = getHashTableType(condition, inner_outer_pairs);
57 
58  if (VLOGGING(1)) {
59  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
60  << " for qual: " << condition->toString();
61  ts1 = std::chrono::steady_clock::now();
62  }
63 
64  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
65  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
66 
67  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
68  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
69 
70  const auto& query_info =
71  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
72  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
73  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
74  throw TooManyHashEntries();
75  }
76  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
78  condition.get(), executor, inner_outer_pairs)
79  : 0;
80  const auto entries_per_device =
81  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
82  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
83  query_infos,
84  memory_level,
85  layout,
86  entries_per_device,
87  column_cache,
88  executor,
89  inner_outer_pairs,
90  device_count);
91  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
92  try {
93  join_hash_table->reify();
94  } catch (const HashJoinFail& e) {
95  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
96  "involved in equijoin | ") +
97  e.what());
98  } catch (const ColumnarConversionNotSupported& e) {
99  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
100  e.what());
101  } catch (const std::exception& e) {
102  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
103  << e.what();
104  }
105  if (VLOGGING(1)) {
106  ts2 = std::chrono::steady_clock::now();
107  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
108  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
109  << " ms";
110  }
111  return join_hash_table;
112 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define LOG(tag)
Definition: Logger.h:188
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
#define VLOGGING(n)
Definition: Logger.h:195
int getInnerTableId() const noexceptoverride
static std::string getHashTypeString(HashType ht) noexcept
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentCount ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 501 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_.

Referenced by codegenKey(), and codegenMatchingSet().

501  {
502  return bucket_sizes_for_dimension_.size();
503 }
std::vector< double > bucket_sizes_for_dimension_

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentWidth ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 497 of file OverlapsJoinHashTable.cpp.

Referenced by calculateHashTableSize(), codegenKey(), codegenManyKey(), codegenMatchingSet(), and initHashTableOnCpu().

497  {
498  return 8;
499 }

+ Here is the caller graph for this function:

int OverlapsJoinHashTable::initHashTableOnCpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 505 of file OverlapsJoinHashTable.cpp.

References calculateHashTableSize(), CHECK(), BaselineJoinHashTable::condition_, BaselineJoinHashTable::cpu_hash_table_buff_, cpu_threads(), DEBUG_TIMER, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, fill_one_to_many_baseline_hash_table_32(), fill_one_to_many_baseline_hash_table_64(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), getKeyComponentWidth(), init_baseline_hash_join_buff_32(), init_baseline_hash_join_buff_64(), init_hash_join_buff(), BaselineJoinHashTable::initHashTableOnCpuFromCache(), BaselineJoinHashTable::layout_, BaselineJoinHashTable::layoutRequiresAdditionalBuffers(), JoinHashTableInterface::ManyToMany, JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_32(), overlaps_fill_baseline_hash_join_buff_64(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::putHashTableOnCpuToCache(), and VLOG.

509  {
510  auto timer = DEBUG_TIMER(__func__);
511  const auto composite_key_info = getCompositeKeyInfo();
512  CHECK(!join_columns.empty());
513  CHECK(!join_bucket_info.empty());
514  HashTableCacheKey cache_key{join_columns.front().num_elems,
515  composite_key_info.cache_key_chunks,
516  condition_->get_optype(),
518  initHashTableOnCpuFromCache(cache_key);
519  if (cpu_hash_table_buff_) {
520  // See if a hash table of a different layout was returned.
521  // If it was OneToMany, we can reuse it on ManyToMany.
522  if (layout != layout_) {
526  }
527  }
528  return 0;
529  }
531  const auto key_component_width = getKeyComponentWidth();
532  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
533  const auto entry_size = key_component_count * key_component_width;
534  const auto keys_for_all_rows = emitted_keys_count_;
535  const size_t one_to_many_hash_entries = 2 * entry_count_ + keys_for_all_rows;
536  const size_t hash_table_size =
537  calculateHashTableSize(join_bucket_info[0].bucket_sizes_for_dimension.size(),
539  entry_count_);
540 
541  VLOG(1) << "Initializing CPU Overlaps Join Hash Table with " << entry_count_
542  << " hash entries and " << one_to_many_hash_entries
543  << " entries in the one to many buffer";
544  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
545 
546  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
547  int thread_count = cpu_threads();
548  std::vector<std::future<void>> init_cpu_buff_threads;
549  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
550  init_cpu_buff_threads.emplace_back(std::async(
551  std::launch::async,
552  [this, key_component_count, key_component_width, thread_idx, thread_count] {
553  switch (key_component_width) {
554  case 4:
556  entry_count_,
557  key_component_count,
558  false,
559  -1,
560  thread_idx,
561  thread_count);
562  break;
563  case 8:
565  entry_count_,
566  key_component_count,
567  false,
568  -1,
569  thread_idx,
570  thread_count);
571  break;
572  default:
573  CHECK(false);
574  }
575  }));
576  }
577  for (auto& child : init_cpu_buff_threads) {
578  child.get();
579  }
580  std::vector<std::future<int>> fill_cpu_buff_threads;
581  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
582  fill_cpu_buff_threads.emplace_back(std::async(
583  std::launch::async,
584  [this,
585  &join_columns,
586  &join_bucket_info,
587  key_component_count,
588  key_component_width,
589  thread_idx,
590  thread_count] {
591  switch (key_component_width) {
592  case 4: {
593  const auto key_handler = OverlapsKeyHandler(
594  key_component_count,
595  &join_columns[0],
596  join_bucket_info[0].bucket_sizes_for_dimension.data());
598  entry_count_,
599  -1,
600  key_component_count,
601  false,
602  &key_handler,
603  join_columns[0].num_elems,
604  thread_idx,
605  thread_count);
606  }
607  case 8: {
608  const auto key_handler = OverlapsKeyHandler(
609  key_component_count,
610  &join_columns[0],
611  join_bucket_info[0].bucket_sizes_for_dimension.data());
613  entry_count_,
614  -1,
615  key_component_count,
616  false,
617  &key_handler,
618  join_columns[0].num_elems,
619  thread_idx,
620  thread_count);
621  }
622  default:
623  CHECK(false);
624  }
625  return -1;
626  }));
627  }
628  int err = 0;
629  for (auto& child : fill_cpu_buff_threads) {
630  int partial_err = child.get();
631  if (partial_err) {
632  err = partial_err;
633  }
634  }
635  if (err) {
636  cpu_hash_table_buff_.reset();
637  return err;
638  }
639  auto one_to_many_buff =
640  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size);
641  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
642  switch (key_component_width) {
643  case 4: {
644  const auto composite_key_dict =
645  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
647  composite_key_dict,
648  entry_count_,
649  -1,
650  key_component_count,
651  join_columns,
652  join_column_types,
653  join_bucket_info,
654  composite_key_info.sd_inner_proxy_per_key,
655  composite_key_info.sd_outer_proxy_per_key,
656  thread_count);
657  break;
658  }
659  case 8: {
660  const auto composite_key_dict =
661  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
663  composite_key_dict,
664  entry_count_,
665  -1,
666  key_component_count,
667  join_columns,
668  join_column_types,
669  join_bucket_info,
670  composite_key_info.sd_inner_proxy_per_key,
671  composite_key_info.sd_outer_proxy_per_key,
672  thread_count);
673  break;
674  }
675  default:
676  CHECK(false);
677  }
678  if (!err && getInnerTableId() > 0) {
679  putHashTableOnCpuToCache(cache_key);
680  }
681  return err;
682 }
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
void putHashTableOnCpuToCache(const HashTableCacheKey &)
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
JoinHashTableInterface::HashType layout_
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int getInnerTableId() const noexceptoverride
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
size_t num_elems
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

int OverlapsJoinHashTable::initHashTableOnGpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout,
const size_t  key_component_width,
const size_t  key_component_count,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 684 of file OverlapsJoinHashTable.cpp.

References CudaAllocator::alloc(), CHECK(), CHECK_EQ, copy_from_gpu(), copy_to_gpu(), DEBUG_TIMER, BaselineJoinHashTable::entry_count_, BaselineJoinHashTable::executor_, init_baseline_hash_join_buff_on_device_32(), init_baseline_hash_join_buff_on_device_64(), init_hash_join_buff_on_device(), BaselineJoinHashTable::layoutRequiresAdditionalBuffers(), overlaps_fill_baseline_hash_join_buff_on_device_64(), overlaps_fill_one_to_many_baseline_hash_table_on_device_64(), transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), and UNREACHABLE.

691  {
692  auto timer = DEBUG_TIMER(__func__);
693  int err = 0;
694  // TODO(adb): 4 byte keys
695  CHECK_EQ(key_component_width, size_t(8));
697 #ifdef HAVE_CUDA
698  const auto catalog = executor_->getCatalog();
699  auto& data_mgr = catalog->getDataMgr();
700  CudaAllocator allocator(&data_mgr, device_id);
701  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(allocator.alloc(sizeof(int)));
702  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
703  switch (key_component_width) {
704  case 4:
706  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
707  entry_count_,
708  key_component_count,
709  false,
710  -1,
711  executor_->blockSize(),
712  executor_->gridSize());
713  break;
714  case 8:
716  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
717  entry_count_,
718  key_component_count,
719  false,
720  -1,
721  executor_->blockSize(),
722  executor_->gridSize());
723  break;
724  default:
725  CHECK(false);
726  }
727  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
728  auto hash_buff = gpu_hash_table_buff_[device_id]->getMemoryPtr();
729  CHECK_EQ(join_columns.size(), 1u);
730  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
731  auto bucket_sizes_gpu =
732  transfer_vector_of_flat_objects_to_gpu(bucket_sizes_for_dimension, allocator);
733  const auto key_handler = OverlapsKeyHandler(
734  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
735  const auto key_handler_gpu = transfer_flat_object_to_gpu(key_handler, allocator);
736  switch (key_component_width) {
737  case 8: {
739  hash_buff,
740  entry_count_,
741  -1,
742  key_component_count,
743  false,
744  reinterpret_cast<int*>(dev_err_buff),
745  key_handler_gpu,
746  join_columns.front().num_elems,
747  executor_->blockSize(),
748  executor_->gridSize());
749  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
750  break;
751  }
752  default:
753  UNREACHABLE();
754  }
755  if (err) {
756  return err;
757  }
758  const auto entry_size = key_component_count * key_component_width;
759  auto one_to_many_buff = reinterpret_cast<int32_t*>(
760  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
761  switch (key_component_width) {
762  case 8: {
763  const auto composite_key_dict =
764  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
765  init_hash_join_buff_on_device(one_to_many_buff,
766  entry_count_,
767  -1,
768  executor_->blockSize(),
769  executor_->gridSize());
771  one_to_many_buff,
772  composite_key_dict,
773  entry_count_,
774  -1,
775  key_handler_gpu,
776  join_columns.front().num_elems,
777  executor_->blockSize(),
778  executor_->gridSize());
779  break;
780  }
781  default:
782  UNREACHABLE();
783  }
784 #else
785  UNREACHABLE();
786 #endif
787  return err;
788 }
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)

+ Here is the call graph for this function:

void OverlapsJoinHashTable::reifyWithLayout ( const JoinHashTableInterface::HashType  layout)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 114 of file OverlapsJoinHashTable.cpp.

References auto_tuner_cache_, auto_tuner_cache_mutex_, bucket_sizes_for_dimension_, calculateCounts(), calculateHashTableSize(), BaselineJoinHashTable::catalog_, CHECK(), CHECK_GE, BaselineJoinHashTable::condition_, DEBUG_TIMER, BaselineJoinHashTable::device_count_, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, fetchColumnsForDevice(), g_overlaps_max_table_size_bytes, get_inner_query_info(), BaselineJoinHashTable::getCompositeKeyInfo(), Catalog_Namespace::Catalog::getDataMgr(), JoinHashTableInterface::getHashTypeString(), BaselineJoinHashTable::getInnerTableId(), Data_Namespace::GPU_LEVEL, InputTableInfo::info, BaselineJoinHashTable::layout_, BaselineJoinHashTable::layoutRequiresAdditionalBuffers(), BaselineJoinHashTable::memory_level_, only_shards_for_device(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::query_infos_, BaselineJoinHashTable::reifyForDevice(), BaselineJoinHashTable::shardCount(), logger::thread_id(), and VLOG.

115  {
116  auto timer = DEBUG_TIMER(__func__);
118  layout_ = layout;
119  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
120  VLOG(1) << "Reify with layout " << getHashTypeString(layout_)
121  << "for table_id: " << getInnerTableId();
122  if (query_info.fragments.empty()) {
123  return;
124  }
125  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
126  auto& data_mgr = catalog_->getDataMgr();
127  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
129  for (int device_id = 0; device_id < device_count_; ++device_id) {
130  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
131  }
132  }
133  const auto shard_count = shardCount();
134  for (int device_id = 0; device_id < device_count_; ++device_id) {
135  const auto fragments =
136  shard_count
137  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
138  : query_info.fragments;
139  const auto columns_for_device =
140  fetchColumnsForDevice(fragments,
141  device_id,
143  ? dev_buff_owners[device_id].get()
144  : nullptr);
145  columns_per_device.push_back(columns_for_device);
146  }
147 
148  // Prepare to calculate the size of the hash table.
149  const auto composite_key_info = getCompositeKeyInfo();
150  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
151  composite_key_info.cache_key_chunks,
152  condition_->get_optype()};
154 
155  auto cache_key_contains_intermediate_table = [](const auto cache_key) {
156  for (auto key : cache_key.chunk_keys) {
157  CHECK_GE(key.size(), size_t(2));
158  if (key[1] < 0) {
159  return true;
160  }
161  }
162  return false;
163  };
164 
165  // Auto-tuner: Pre-calculate some possible hash table sizes.
166  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
167  auto atc = auto_tuner_cache_.find(cache_key);
168  if (atc != auto_tuner_cache_.end()) {
170  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
172  } else {
173  VLOG(1) << "Auto tuning for the overlaps hash table size:";
174  // TODO(jclay): Currently, joining on large poly sets
175  // will lead to lengthy construction times (and large hash tables)
176  // tune this to account for the characteristics of the data being joined.
177  const double min_threshold{1e-5};
178  const double max_threshold{1};
179  double good_threshold{max_threshold};
180  for (double threshold = max_threshold; threshold >= min_threshold;
181  threshold /= 10.0) {
183  size_t entry_count;
184  size_t emitted_keys_count;
185  std::tie(entry_count, emitted_keys_count) =
186  calculateCounts(shard_count, query_info, columns_per_device);
187  size_t hash_table_size = calculateHashTableSize(
188  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
190  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
191  << " giving: entry count " << entry_count << " hash table size "
192  << hash_table_size;
193  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
194  good_threshold = overlaps_hashjoin_bucket_threshold_;
195  } else {
196  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
197  break;
198  }
199  }
200  overlaps_hashjoin_bucket_threshold_ = good_threshold;
201  if (!cache_key_contains_intermediate_table(cache_key)) {
203  }
204  }
205 
206  // Calculate the final size of the hash table.
207  VLOG(1) << "Accepted bin threshold of " << std::fixed
209  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
210  // from entries_per_device.
211  std::tie(entry_count_, emitted_keys_count_) =
212  calculateCounts(shard_count, query_info, columns_per_device);
213  size_t hash_table_size = calculateHashTableSize(
215  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
216  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
217  << entry_count_ << " hash table size " << hash_table_size;
218 
219  std::vector<std::future<void>> init_threads;
220  for (int device_id = 0; device_id < device_count_; ++device_id) {
221  const auto fragments =
222  shard_count
223  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
224  : query_info.fragments;
225  init_threads.push_back(std::async(std::launch::async,
227  this,
228  columns_per_device[device_id],
229  layout,
230  device_id,
231  logger::thread_id()));
232  }
233  for (auto& init_thread : init_threads) {
234  init_thread.wait();
235  }
236  for (auto& init_thread : init_threads) {
237  init_thread.get();
238  }
239 }
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
std::vector< double > bucket_sizes_for_dimension_
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static std::mutex auto_tuner_cache_mutex_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner) override
#define CHECK_GE(x, y)
Definition: Logger.h:210
JoinHashTableInterface::HashType layout_
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::map< HashTableCacheKey, double > auto_tuner_cache_
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::vector< InputTableInfo > & query_infos_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:94
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
int getInnerTableId() const noexceptoverride
const Catalog_Namespace::Catalog * catalog_
const Data_Namespace::MemoryLevel memory_level_
ThreadId thread_id()
Definition: Logger.cpp:715
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::string getHashTypeString(HashType ht) noexcept
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

static auto OverlapsJoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 54 of file OverlapsJoinHashTable.h.

References auto_tuner_cache_, auto_tuner_cache_mutex_, and VLOG.

54  {
55  VLOG(1) << "Invalidate " << auto_tuner_cache_.size() << " cached overlaps hashtable.";
56  return []() -> void {
57  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
58  auto_tuner_cache_.clear();
59  };
60  }
static std::mutex auto_tuner_cache_mutex_
static std::map< HashTableCacheKey, double > auto_tuner_cache_
#define VLOG(n)
Definition: Logger.h:291

Member Data Documentation

std::map< OverlapsJoinHashTable::HashTableCacheKey, double > OverlapsJoinHashTable::auto_tuner_cache_
staticprotected

Definition at line 107 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::mutex OverlapsJoinHashTable::auto_tuner_cache_mutex_
staticprotected

Definition at line 108 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::vector<double> OverlapsJoinHashTable::bucket_sizes_for_dimension_
private
double OverlapsJoinHashTable::overlaps_hashjoin_bucket_threshold_
private

The documentation for this class was generated from the following files: