OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable Class Reference

#include <OverlapsJoinHashTable.h>

+ Inheritance diagram for OverlapsJoinHashTable:
+ Collaboration diagram for OverlapsJoinHashTable:

Public Member Functions

 OverlapsJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
 
 ~OverlapsJoinHashTable () override
 
- Public Member Functions inherited from BaselineJoinHashTable
int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
size_t getJoinHashBufferSize (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
std::string toString (const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
 
std::set
< DecodedJoinHashBufferEntry
toSet (const ExecutorDeviceType device_type, const int device_id) const override
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexceptoverride
 
int getInnerTableRteIdx () const noexceptoverride
 
JoinHashTableInterface::HashType getHashType () const noexceptoverride
 
Data_Namespace::MemoryLevel getMemoryLevel () const noexceptoverride
 
int getDeviceCount () const noexceptoverride
 
size_t offsetBufferOff () const noexceptoverride
 
size_t countBufferOff () const noexceptoverride
 
size_t payloadBufferOff () const noexceptoverride
 
virtual ~BaselineJoinHashTable ()
 
- Public Member Functions inherited from JoinHashTableInterface
virtual std::string toStringFlat64 (const ExecutorDeviceType device_type, const int device_id) const
 
virtual std::string toStringFlat32 (const ExecutorDeviceType device_type, const int device_id) const
 
JoinColumn fetchJoinColumn (const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
 

Static Public Member Functions

static std::shared_ptr
< OverlapsJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
- Static Public Member Functions inherited from BaselineJoinHashTable
static std::shared_ptr
< BaselineJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static size_t getShardCountForCondition (const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
- Static Public Member Functions inherited from JoinHashTableInterface
static DecodedJoinHashBufferSet toSet (size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
 Decode hash table into a std::set for easy inspection and validation. More...
 
static std::string toString (const std::string &type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
 Decode hash table into a human-readable string. More...
 
static std::shared_ptr
< JoinHashTableInterface
getInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr
< JoinHashTableInterface
getSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static std::shared_ptr
< JoinHashTableInterface
getSyntheticInstance (const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 

Protected Member Functions

void reifyWithLayout (const JoinHashTableInterface::HashType layout) override
 
std::pair< size_t, size_t > calculateCounts (size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
 
size_t calculateHashTableSize (size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
 
ColumnsForDevice fetchColumnsForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, ThrustAllocator &dev_buff_owner) override
 
std::pair< size_t, size_t > approximateTupleCount (const std::vector< ColumnsForDevice > &) const override
 
size_t getKeyComponentWidth () const override
 
size_t getKeyComponentCount () const override
 
int initHashTableOnCpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout) override
 
int initHashTableOnGpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id) override
 
llvm::Value * codegenKey (const CompilationOptions &) override
 
- Protected Member Functions inherited from BaselineJoinHashTable
 BaselineJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
 
size_t shardCount () const
 
Data_Namespace::MemoryLevel getEffectiveMemoryLevel (const std::vector< InnerOuter > &inner_outer_pairs) const
 
CompositeKeyInfo getCompositeKeyInfo () const
 
void reify ()
 
void reifyForDevice (const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
int initHashTableForDevice (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
llvm::Value * hashPtr (const size_t index)
 
void initHashTableOnCpuFromCache (const HashTableCacheKey &)
 
void putHashTableOnCpuToCache (const HashTableCacheKey &)
 
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache (const HashTableCacheKey &) const
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
const HashTableCacheValuefindHashTableOnCpuInCache (const HashTableCacheKey &)
 

Static Protected Attributes

static std::map
< HashTableCacheKey, double > 
auto_tuner_cache_
 
static std::mutex auto_tuner_cache_mutex_
 
- Static Protected Attributes inherited from BaselineJoinHashTable
static std::vector< std::pair
< HashTableCacheKey,
HashTableCacheValue > > 
hash_table_cache_
 
static std::mutex hash_table_cache_mutex_
 
static const int ERR_FAILED_TO_FETCH_COLUMN {-3}
 
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN {-4}
 

Private Member Functions

void computeBucketSizes (std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs)
 

Private Attributes

std::vector< double > bucket_sizes_for_dimension_
 
double overlaps_hashjoin_bucket_threshold_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType { HashType::OneToOne, HashType::OneToMany }
 
- Protected Types inherited from JoinHashTableInterface
using LinearizedColumn = std::pair< const int8_t *, size_t >
 
using LinearizedColumnCacheKey = std::pair< int, int >
 
- Static Protected Member Functions inherited from BaselineJoinHashTable
static int getInnerTableId (const std::vector< InnerOuter > &inner_outer_pairs)
 
- Protected Attributes inherited from BaselineJoinHashTable
const std::shared_ptr
< Analyzer::BinOper
condition_
 
const std::vector
< InputTableInfo > & 
query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
JoinHashTableInterface::HashType layout_
 
size_t entry_count_
 
size_t emitted_keys_count_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
std::shared_ptr< std::vector
< int8_t > > 
cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
std::vector< InnerOuterinner_outer_pairs_
 
const Catalog_Namespace::Catalogcatalog_
 
const int device_count_
 
- Protected Attributes inherited from JoinHashTableInterface
std::map
< LinearizedColumnCacheKey,
LinearizedColumn
linearized_multifrag_columns_
 
std::mutex linearized_multifrag_column_mutex_
 
RowSetMemoryOwner linearized_multifrag_column_owner_
 

Detailed Description

Definition at line 22 of file OverlapsJoinHashTable.h.

Constructor & Destructor Documentation

OverlapsJoinHashTable::OverlapsJoinHashTable ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const size_t  entry_count,
ColumnCacheMap column_cache,
Executor executor,
const std::vector< InnerOuter > &  inner_outer_pairs,
const int  device_count 
)
inline

Definition at line 24 of file OverlapsJoinHashTable.h.

32  : BaselineJoinHashTable(condition,
33  query_infos,
34  memory_level,
36  entry_count,
37  column_cache,
38  executor,
39  inner_outer_pairs,
40  device_count) {}
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
OverlapsJoinHashTable::~OverlapsJoinHashTable ( )
inlineoverride

Definition at line 42 of file OverlapsJoinHashTable.h.

42 {}

Member Function Documentation

std::pair< size_t, size_t > OverlapsJoinHashTable::approximateTupleCount ( const std::vector< ColumnsForDevice > &  columns_per_device) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 279 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), approximate_distinct_tuples_on_device_overlaps(), approximate_distinct_tuples_overlaps(), Bitmap, CHECK(), CHECK_EQ, CHECK_GT, BaselineJoinHashTable::condition_, copy_from_gpu(), copy_to_gpu(), CPU, Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::device_count_, BaselineJoinHashTable::executor_, BaselineJoinHashTable::getApproximateTupleCountFromCache(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getEffectiveMemoryLevel(), GPU, Data_Namespace::GPU_LEVEL, hll_size(), hll_unify(), BaselineJoinHashTable::inner_outer_pairs_, overlaps_hashjoin_bucket_threshold_, transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), UNREACHABLE, and VLOG.

Referenced by calculateCounts().

280  {
281  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
282  CountDistinctDescriptor count_distinct_desc{
284  0,
285  11,
286  true,
287  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
290  1};
291  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
292 
293  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
294  // Number of keys must match dimension of buckets
295  CHECK_EQ(columns_per_device.front().join_columns.size(),
296  columns_per_device.front().join_buckets.size());
297  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
298  const auto composite_key_info = getCompositeKeyInfo();
299  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
300  composite_key_info.cache_key_chunks,
301  condition_->get_optype(),
303  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
304  if (cached_count_info.first >= 0) {
305  VLOG(1) << "Using a cached tuple count: " << cached_count_info.first
306  << ", emitted keys count: " << cached_count_info.second;
307  return std::make_pair(cached_count_info.first, cached_count_info.second);
308  }
309  int thread_count = cpu_threads();
310  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
311  auto hll_result = &hll_buffer_all_cpus[0];
312 
313  std::vector<int32_t> num_keys_for_row;
314  // TODO(adb): support multi-column overlaps join
315  CHECK_EQ(columns_per_device.size(), 1u);
316  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
317 
319  num_keys_for_row,
320  count_distinct_desc.bitmap_sz_bits,
321  padded_size_bytes,
322  columns_per_device.front().join_columns,
323  columns_per_device.front().join_column_types,
324  columns_per_device.front().join_buckets,
325  thread_count);
326  for (int i = 1; i < thread_count; ++i) {
327  hll_unify(hll_result,
328  hll_result + i * padded_size_bytes,
329  1 << count_distinct_desc.bitmap_sz_bits);
330  }
331  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
332  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
333  }
334 #ifdef HAVE_CUDA
335  auto& data_mgr = executor_->getCatalog()->getDataMgr();
336  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
337  for (auto& host_hll_buffer : host_hll_buffers) {
338  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
339  }
340  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
341  std::vector<std::future<void>> approximate_distinct_device_threads;
342  for (int device_id = 0; device_id < device_count_; ++device_id) {
343  approximate_distinct_device_threads.emplace_back(std::async(
344  std::launch::async,
345  [device_id,
346  &columns_per_device,
347  &count_distinct_desc,
348  &data_mgr,
349  &host_hll_buffers,
350  &emitted_keys_count_device_threads,
351  this] {
352  ThrustAllocator allocator(&data_mgr, device_id);
353  auto device_hll_buffer =
354  allocator.allocateScopedBuffer(count_distinct_desc.bitmapPaddedSizeBytes());
355  data_mgr.getCudaMgr()->zeroDeviceMem(
356  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
357  const auto& columns_for_device = columns_per_device[device_id];
358  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
359  columns_for_device.join_columns, allocator);
360 
361  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
362  const auto& bucket_sizes_for_dimension =
363  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
364  auto bucket_sizes_gpu = allocator.allocateScopedBuffer(
365  bucket_sizes_for_dimension.size() * sizeof(double));
366  copy_to_gpu(&data_mgr,
367  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
368  bucket_sizes_for_dimension.data(),
369  bucket_sizes_for_dimension.size() * sizeof(double),
370  device_id);
371  const size_t row_counts_buffer_sz =
372  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
373  auto row_counts_buffer = allocator.allocateScopedBuffer(row_counts_buffer_sz);
374  data_mgr.getCudaMgr()->zeroDeviceMem(
375  row_counts_buffer, row_counts_buffer_sz, device_id);
376  const auto key_handler =
377  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
378  join_columns_gpu,
379  reinterpret_cast<double*>(bucket_sizes_gpu));
380  const auto key_handler_gpu =
381  transfer_flat_object_to_gpu(key_handler, allocator);
383  reinterpret_cast<uint8_t*>(device_hll_buffer),
384  count_distinct_desc.bitmap_sz_bits,
385  reinterpret_cast<int32_t*>(row_counts_buffer),
386  key_handler_gpu,
387  columns_for_device.join_columns[0].num_elems,
388  executor_->blockSize(),
389  executor_->gridSize());
390 
391  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
392  copy_from_gpu(&data_mgr,
393  &host_emitted_keys_count,
394  reinterpret_cast<CUdeviceptr>(
395  row_counts_buffer +
396  (columns_per_device.front().join_columns[0].num_elems - 1) *
397  sizeof(int32_t)),
398  sizeof(int32_t),
399  device_id);
400 
401  auto& host_hll_buffer = host_hll_buffers[device_id];
402  copy_from_gpu(&data_mgr,
403  &host_hll_buffer[0],
404  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
405  count_distinct_desc.bitmapPaddedSizeBytes(),
406  device_id);
407  }));
408  }
409  for (auto& child : approximate_distinct_device_threads) {
410  child.get();
411  }
412  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
413  auto& result_hll_buffer = host_hll_buffers.front();
414  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
415  for (int device_id = 1; device_id < device_count_; ++device_id) {
416  auto& host_hll_buffer = host_hll_buffers[device_id];
417  hll_unify(hll_result,
418  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
419  1 << count_distinct_desc.bitmap_sz_bits);
420  }
421  size_t emitted_keys_count = 0;
422  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
423  emitted_keys_count += emitted_keys_count_device;
424  }
425  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
426  emitted_keys_count);
427 #else
428  UNREACHABLE();
429  return {0, 0};
430 #endif // HAVE_CUDA
431 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
#define UNREACHABLE()
Definition: Logger.h:241
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:209
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
CompositeKeyInfo getCompositeKeyInfo() const
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
T * transfer_flat_object_to_gpu(const T &object, ThrustAllocator &allocator)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > OverlapsJoinHashTable::calculateCounts ( size_t  shard_count,
const Fragmenter_Namespace::TableInfo query_info,
std::vector< BaselineJoinHashTable::ColumnsForDevice > &  columns_per_device 
)
protected

Definition at line 184 of file OverlapsJoinHashTable.cpp.

References approximateTupleCount(), BaselineJoinHashTable::catalog_, BaselineJoinHashTable::device_count_, fetchColumnsForDevice(), Fragmenter_Namespace::TableInfo::fragments, get_entries_per_device(), Catalog_Namespace::Catalog::getDataMgr(), BaselineJoinHashTable::memory_level_, and only_shards_for_device().

Referenced by reifyWithLayout().

187  {
188  auto& data_mgr = catalog_->getDataMgr();
189  auto dev_buff_owners =
190  std::make_unique<std::unique_ptr<ThrustAllocator>[]>(device_count_);
191  for (int device_id = 0; device_id < device_count_; ++device_id) {
192  dev_buff_owners[device_id] = std::make_unique<ThrustAllocator>(&data_mgr, device_id);
193  }
194  for (int device_id = 0; device_id < device_count_; ++device_id) {
195  const auto fragments =
196  shard_count
197  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
198  : query_info.fragments;
199  const auto columns_for_device =
200  fetchColumnsForDevice(fragments, device_id, *dev_buff_owners[device_id]);
201  columns_per_device.push_back(columns_for_device);
202  }
203 
204  size_t tuple_count;
205  size_t emitted_keys_count;
206  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
207  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
208 
209  return std::make_pair(
210  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
211  emitted_keys_count);
212 }
ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, ThrustAllocator &dev_buff_owner) override
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:166
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
const Catalog_Namespace::Catalog * catalog_
const Data_Namespace::MemoryLevel memory_level_
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::calculateHashTableSize ( size_t  number_of_dimensions,
size_t  emitted_keys_count,
size_t  entry_count 
) const
protected

Definition at line 214 of file OverlapsJoinHashTable.cpp.

References getKeyComponentWidth().

Referenced by initHashTableOnCpu(), and reifyWithLayout().

216  {
217  const auto key_component_width = getKeyComponentWidth();
218  const auto key_component_count = number_of_dimensions;
219  const auto entry_size = key_component_count * key_component_width;
220  const auto keys_for_all_rows = emitted_keys_count;
221  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
222  const size_t hash_table_size =
223  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
224  return hash_table_size;
225 }
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

llvm::Value * OverlapsJoinHashTable::codegenKey ( const CompilationOptions co)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 726 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), CHECK_EQ, CodeGenerator::codegen(), BaselineJoinHashTable::executor_, get_int_type(), getKeyComponentCount(), getKeyComponentWidth(), BaselineJoinHashTable::inner_outer_pairs_, kENCODING_GEOINT, kPOINT, kTINYINT, LL_BUILDER, LL_CONTEXT, LL_FP, LL_INT, and CodeGenerator::posArg().

726  {
727  const auto key_component_width = getKeyComponentWidth();
728  CHECK(key_component_width == 4 || key_component_width == 8);
729  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
730  llvm::Value* key_buff_lv{nullptr};
731  switch (key_component_width) {
732  case 4:
733  key_buff_lv =
734  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
735  break;
736  case 8:
737  key_buff_lv =
738  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
739  break;
740  default:
741  CHECK(false);
742  }
743 
744  const auto& inner_outer_pair = inner_outer_pairs_[0];
745  const auto outer_col = inner_outer_pair.second;
746  const auto outer_col_ti = outer_col->get_type_info();
747 
748  if (outer_col_ti.is_geometry()) {
749  CodeGenerator code_generator(executor_);
750  // TODO(adb): for points we will use the coords array, but for other geometries we
751  // will need to use the bounding box. For now only support points.
752  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
753  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
754 
755  const auto col_lvs = code_generator.codegen(outer_col, true, co);
756  CHECK_EQ(col_lvs.size(), size_t(1));
757 
758  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
759  CHECK(outer_col_var);
760  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
761  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
762  CHECK(coords_cd);
763 
764  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
765  "array_buff",
766  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
767  {col_lvs.front(), code_generator.posArg(outer_col)});
768  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
769  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
770  const auto arr_ptr =
771  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
772 
773  for (size_t i = 0; i < 2; i++) {
774  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
775 
776  // Note that get_bucket_key_for_range_compressed will need to be specialized for
777  // future compression schemes
778  auto bucket_key =
779  outer_col_ti.get_compression() == kENCODING_GEOINT
780  ? executor_->cgen_state_->emitExternalCall(
781  "get_bucket_key_for_range_compressed",
783  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
784  : executor_->cgen_state_->emitExternalCall(
785  "get_bucket_key_for_range_double",
787  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
788  const auto col_lv = LL_BUILDER.CreateSExt(
789  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
790  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
791  }
792  } else {
793  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
794  }
795  return key_buff_lv;
796 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
#define LOG(tag)
Definition: Logger.h:188
#define LL_FP(v)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
#define LL_BUILDER
#define LL_INT(v)
size_t getKeyComponentCount() const override
#define LL_CONTEXT
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

void OverlapsJoinHashTable::computeBucketSizes ( std::vector< double > &  bucket_sizes_for_dimension,
const JoinColumn join_column,
const JoinColumnTypeInfo join_column_type,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
private

Definition at line 798 of file OverlapsJoinHashTable.cpp.

References CHECK(), CHECK_EQ, CHECK_GT, compute_bucket_sizes(), compute_bucket_sizes_on_device(), copy_from_gpu(), Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getEffectiveMemoryLevel(), overlaps_hashjoin_bucket_threshold_, to_string(), transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), and VLOG.

Referenced by fetchColumnsForDevice().

802  {
803  // No coalesced keys for overlaps joins yet
804  CHECK_EQ(inner_outer_pairs.size(), 1u);
805 
806  const auto col = inner_outer_pairs[0].first;
807  CHECK(col);
808  const auto col_ti = col->get_type_info();
809  CHECK(col_ti.is_array());
810 
811  // Compute the number of dimensions for this overlaps key
812  int num_dims{-1};
813  if (col_ti.is_fixlen_array()) {
814  num_dims = col_ti.get_size() / col_ti.get_elem_type().get_size();
815  num_dims /= 2;
816  } else {
817  CHECK(col_ti.is_varlen_array());
818  num_dims = 2;
819  // TODO(adb): how can we pick the number of dims in the varlen case? e.g.
820  // backwards compatibility with existing bounds cols or generic range joins
821  }
822  CHECK_GT(num_dims, 0);
823  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
824 
825  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
827 
828  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
829  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
830  const int thread_count = cpu_threads();
831  compute_bucket_sizes(local_bucket_sizes,
832  join_column,
833  join_column_type,
835  thread_count);
836  }
837 #ifdef HAVE_CUDA
838  else {
839  // Note that we compute the bucket sizes using only a single GPU
840  const int device_id = 0;
841  auto& data_mgr = executor_->getCatalog()->getDataMgr();
842  ThrustAllocator allocator(&data_mgr, device_id);
843  auto device_bucket_sizes_gpu =
844  transfer_vector_of_flat_objects_to_gpu(local_bucket_sizes, allocator);
845  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
846  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
847 
848  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
849  join_column_gpu,
850  join_column_type_gpu,
852  executor_->blockSize(),
853  executor_->gridSize());
854  copy_from_gpu(&data_mgr,
855  local_bucket_sizes.data(),
856  reinterpret_cast<CUdeviceptr>(device_bucket_sizes_gpu),
857  local_bucket_sizes.size() * sizeof(double),
858  device_id);
859  }
860 #endif
861 
862  size_t ctr = 0;
863  for (auto& bucket_sz : local_bucket_sizes) {
864  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
865  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
866  }
867 
868  return;
869 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
unsigned long long CUdeviceptr
Definition: nocuda.h:27
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, ThrustAllocator &allocator)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

BaselineJoinHashTable::ColumnsForDevice OverlapsJoinHashTable::fetchColumnsForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id,
ThrustAllocator dev_buff_owner 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 227 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), BaselineJoinHashTable::column_cache_, computeBucketSizes(), BaselineJoinHashTable::executor_, JoinHashTableInterface::fetchJoinColumn(), get_column_descriptor_maybe(), get_join_column_type_kind(), BaselineJoinHashTable::getEffectiveMemoryLevel(), BaselineJoinHashTable::inner_outer_pairs_, BaselineJoinHashTable::isBitwiseEq(), and kDOUBLE.

Referenced by calculateCounts().

230  {
231  const auto& catalog = *executor_->getCatalog();
232  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
233 
234  std::vector<JoinColumn> join_columns;
235  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
236  std::vector<JoinColumnTypeInfo> join_column_types;
237  std::vector<JoinBucketInfo> join_bucket_info;
238  std::vector<std::shared_ptr<void>> malloc_owner;
239  for (const auto& inner_outer_pair : inner_outer_pairs_) {
240  const auto inner_col = inner_outer_pair.first;
241  const auto inner_cd = get_column_descriptor_maybe(
242  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
243  if (inner_cd && inner_cd->isVirtualCol) {
245  }
246  join_columns.emplace_back(fetchJoinColumn(inner_col,
247  fragments,
248  effective_memory_level,
249  device_id,
250  chunks_owner,
251  dev_buff_owner,
252  malloc_owner,
253  executor_,
254  &column_cache_));
255  const auto& ti = inner_col->get_type_info();
256  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
257  0,
258  0,
259  inline_int_null_value<int64_t>(),
260  isBitwiseEq(),
261  0,
263  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
264 
265  if (bucket_sizes_for_dimension_.empty()) {
267  join_columns.back(),
268  join_column_types.back(),
270  }
271  const auto elem_ti = ti.get_elem_type();
272  CHECK(elem_ti.is_fp());
273  join_bucket_info.emplace_back(
274  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
275  }
276  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
277 }
std::vector< double > bucket_sizes_for_dimension_
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs)
ColumnCacheMap & column_cache_
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< OverlapsJoinHashTable > OverlapsJoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from an in-flight SQL query's parse tree etc.

Definition at line 31 of file OverlapsJoinHashTable.cpp.

References logger::FATAL, get_entries_per_device(), get_inner_query_info(), BaselineJoinHashTable::getInnerTableId(), Fragmenter_Namespace::TableInfo::getNumTuplesUpperBound(), BaselineJoinHashTable::getShardCountForCondition(), Data_Namespace::GPU_LEVEL, InputTableInfo::info, LOG, normalize_column_pairs(), VLOG, and VLOGGING.

Referenced by JoinHashTableInterface::getInstance().

37  {
38  decltype(std::chrono::steady_clock::now()) ts1, ts2;
39  if (VLOGGING(1)) {
40  VLOG(1) << "Building geo hash table OneToMany for qual: " << condition->toString();
41  ts1 = std::chrono::steady_clock::now();
42  }
43  auto inner_outer_pairs = normalize_column_pairs(
44  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
45  const auto& query_info =
46  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
47  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
48  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
49  throw TooManyHashEntries();
50  }
51  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
53  condition.get(), executor, inner_outer_pairs)
54  : 0;
55  const auto entries_per_device =
56  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
57  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
58  query_infos,
59  memory_level,
60  entries_per_device,
61  column_cache,
62  executor,
63  inner_outer_pairs,
64  device_count);
65  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
66  try {
67  join_hash_table->reify();
68  } catch (const HashJoinFail& e) {
69  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
70  "involved in equijoin | ") +
71  e.what());
72  } catch (const ColumnarConversionNotSupported& e) {
73  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
74  e.what());
75  } catch (const std::exception& e) {
76  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
77  << e.what();
78  }
79  if (VLOGGING(1)) {
80  ts2 = std::chrono::steady_clock::now();
81  VLOG(1) << "Built geo hash table OneToMany in "
82  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
83  << " ms";
84  }
85  return join_hash_table;
86 }
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define LOG(tag)
Definition: Logger.h:188
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
#define VLOGGING(n)
Definition: Logger.h:195
int getInnerTableId() const noexceptoverride
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentCount ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 437 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_.

Referenced by codegenKey().

437  {
438  return bucket_sizes_for_dimension_.size();
439 }
std::vector< double > bucket_sizes_for_dimension_

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentWidth ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 433 of file OverlapsJoinHashTable.cpp.

Referenced by calculateHashTableSize(), codegenKey(), and initHashTableOnCpu().

433  {
434  return 8;
435 }

+ Here is the caller graph for this function:

int OverlapsJoinHashTable::initHashTableOnCpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 441 of file OverlapsJoinHashTable.cpp.

References calculateHashTableSize(), CHECK(), BaselineJoinHashTable::condition_, BaselineJoinHashTable::cpu_hash_table_buff_, cpu_threads(), DEBUG_TIMER, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, fill_one_to_many_baseline_hash_table_32(), fill_one_to_many_baseline_hash_table_64(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), getKeyComponentWidth(), init_baseline_hash_join_buff_32(), init_baseline_hash_join_buff_64(), init_hash_join_buff(), BaselineJoinHashTable::initHashTableOnCpuFromCache(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_32(), overlaps_fill_baseline_hash_join_buff_64(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::putHashTableOnCpuToCache(), and VLOG.

445  {
446  auto timer = DEBUG_TIMER(__func__);
447  const auto composite_key_info = getCompositeKeyInfo();
448  CHECK(!join_columns.empty());
449  CHECK(!join_bucket_info.empty());
450  HashTableCacheKey cache_key{join_columns.front().num_elems,
451  composite_key_info.cache_key_chunks,
452  condition_->get_optype(),
454  initHashTableOnCpuFromCache(cache_key);
455  if (cpu_hash_table_buff_) {
456  return 0;
457  }
459  const auto key_component_width = getKeyComponentWidth();
460  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
461  const auto entry_size = key_component_count * key_component_width;
462  const auto keys_for_all_rows = emitted_keys_count_;
463  const size_t one_to_many_hash_entries = 2 * entry_count_ + keys_for_all_rows;
464  const size_t hash_table_size =
465  calculateHashTableSize(join_bucket_info[0].bucket_sizes_for_dimension.size(),
467  entry_count_);
468 
469  VLOG(1) << "Initializing CPU Overlaps Join Hash Table with " << entry_count_
470  << " hash entries and " << one_to_many_hash_entries
471  << " entries in the one to many buffer";
472  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
473 
474  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
475  int thread_count = cpu_threads();
476  std::vector<std::future<void>> init_cpu_buff_threads;
477  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
478  init_cpu_buff_threads.emplace_back(std::async(
479  std::launch::async,
480  [this, key_component_count, key_component_width, thread_idx, thread_count] {
481  switch (key_component_width) {
482  case 4:
484  entry_count_,
485  key_component_count,
486  false,
487  -1,
488  thread_idx,
489  thread_count);
490  break;
491  case 8:
493  entry_count_,
494  key_component_count,
495  false,
496  -1,
497  thread_idx,
498  thread_count);
499  break;
500  default:
501  CHECK(false);
502  }
503  }));
504  }
505  for (auto& child : init_cpu_buff_threads) {
506  child.get();
507  }
508  std::vector<std::future<int>> fill_cpu_buff_threads;
509  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
510  fill_cpu_buff_threads.emplace_back(std::async(
511  std::launch::async,
512  [this,
513  &join_columns,
514  &join_bucket_info,
515  key_component_count,
516  key_component_width,
517  thread_idx,
518  thread_count] {
519  switch (key_component_width) {
520  case 4: {
521  const auto key_handler = OverlapsKeyHandler(
522  key_component_count,
523  &join_columns[0],
524  join_bucket_info[0].bucket_sizes_for_dimension.data());
526  entry_count_,
527  -1,
528  key_component_count,
529  false,
530  &key_handler,
531  join_columns[0].num_elems,
532  thread_idx,
533  thread_count);
534  }
535  case 8: {
536  const auto key_handler = OverlapsKeyHandler(
537  key_component_count,
538  &join_columns[0],
539  join_bucket_info[0].bucket_sizes_for_dimension.data());
541  entry_count_,
542  -1,
543  key_component_count,
544  false,
545  &key_handler,
546  join_columns[0].num_elems,
547  thread_idx,
548  thread_count);
549  }
550  default:
551  CHECK(false);
552  }
553  return -1;
554  }));
555  }
556  int err = 0;
557  for (auto& child : fill_cpu_buff_threads) {
558  int partial_err = child.get();
559  if (partial_err) {
560  err = partial_err;
561  }
562  }
563  if (err) {
564  cpu_hash_table_buff_.reset();
565  return err;
566  }
567  auto one_to_many_buff =
568  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size);
569  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
570  switch (key_component_width) {
571  case 4: {
572  const auto composite_key_dict =
573  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
575  composite_key_dict,
576  entry_count_,
577  -1,
578  key_component_count,
579  join_columns,
580  join_column_types,
581  join_bucket_info,
582  composite_key_info.sd_inner_proxy_per_key,
583  composite_key_info.sd_outer_proxy_per_key,
584  thread_count);
585  break;
586  }
587  case 8: {
588  const auto composite_key_dict =
589  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
591  composite_key_dict,
592  entry_count_,
593  -1,
594  key_component_count,
595  join_columns,
596  join_column_types,
597  join_bucket_info,
598  composite_key_info.sd_inner_proxy_per_key,
599  composite_key_info.sd_outer_proxy_per_key,
600  thread_count);
601  break;
602  }
603  default:
604  CHECK(false);
605  }
606  if (!err && getInnerTableId() > 0) {
607  putHashTableOnCpuToCache(cache_key);
608  }
609  return err;
610 }
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
void putHashTableOnCpuToCache(const HashTableCacheKey &)
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int getInnerTableId() const noexceptoverride
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
size_t num_elems
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

int OverlapsJoinHashTable::initHashTableOnGpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout,
const size_t  key_component_width,
const size_t  key_component_count,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 612 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), CHECK(), CHECK_EQ, copy_from_gpu(), copy_to_gpu(), DEBUG_TIMER, BaselineJoinHashTable::entry_count_, BaselineJoinHashTable::executor_, init_baseline_hash_join_buff_on_device_32(), init_baseline_hash_join_buff_on_device_64(), init_hash_join_buff_on_device(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_on_device_64(), overlaps_fill_one_to_many_baseline_hash_table_on_device_64(), transfer_flat_object_to_gpu(), transfer_vector_of_flat_objects_to_gpu(), and UNREACHABLE.

619  {
620  auto timer = DEBUG_TIMER(__func__);
621  int err = 0;
622  // TODO(adb): 4 byte keys
623  CHECK_EQ(key_component_width, size_t(8));
625 #ifdef HAVE_CUDA
626  const auto catalog = executor_->getCatalog();
627  auto& data_mgr = catalog->getDataMgr();
628  ThrustAllocator allocator(&data_mgr, device_id);
629  auto dev_err_buff =
630  reinterpret_cast<CUdeviceptr>(allocator.allocateScopedBuffer(sizeof(int)));
631  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
632  switch (key_component_width) {
633  case 4:
635  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
636  entry_count_,
637  key_component_count,
638  false,
639  -1,
640  executor_->blockSize(),
641  executor_->gridSize());
642  break;
643  case 8:
645  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
646  entry_count_,
647  key_component_count,
648  false,
649  -1,
650  executor_->blockSize(),
651  executor_->gridSize());
652  break;
653  default:
654  CHECK(false);
655  }
656  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
657  auto hash_buff =
658  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
659  CHECK_EQ(join_columns.size(), 1u);
660  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
661  auto bucket_sizes_gpu =
662  transfer_vector_of_flat_objects_to_gpu(bucket_sizes_for_dimension, allocator);
663  const auto key_handler = OverlapsKeyHandler(
664  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
665  const auto key_handler_gpu = transfer_flat_object_to_gpu(key_handler, allocator);
666  switch (key_component_width) {
667  case 8: {
669  hash_buff,
670  entry_count_,
671  -1,
672  key_component_count,
673  false,
674  reinterpret_cast<int*>(dev_err_buff),
675  key_handler_gpu,
676  join_columns.front().num_elems,
677  executor_->blockSize(),
678  executor_->gridSize());
679  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
680  break;
681  }
682  default:
683  UNREACHABLE();
684  }
685  if (err) {
686  return err;
687  }
688  const auto entry_size = key_component_count * key_component_width;
689  auto one_to_many_buff = reinterpret_cast<int32_t*>(
690  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
691  switch (key_component_width) {
692  case 8: {
693  const auto composite_key_dict =
694  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
695  init_hash_join_buff_on_device(one_to_many_buff,
696  entry_count_,
697  -1,
698  executor_->blockSize(),
699  executor_->gridSize());
701  one_to_many_buff,
702  composite_key_dict,
703  entry_count_,
704  -1,
705  key_handler_gpu,
706  join_columns.front().num_elems,
707  executor_->blockSize(),
708  executor_->gridSize());
709  break;
710  }
711  default:
712  UNREACHABLE();
713  }
714 #else
715  UNREACHABLE();
716 #endif
717  return err;
718 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
T * transfer_flat_object_to_gpu(const T &object, ThrustAllocator &allocator)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)

+ Here is the call graph for this function:

void OverlapsJoinHashTable::reifyWithLayout ( const JoinHashTableInterface::HashType  layout)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 88 of file OverlapsJoinHashTable.cpp.

References auto_tuner_cache_, auto_tuner_cache_mutex_, bucket_sizes_for_dimension_, calculateCounts(), calculateHashTableSize(), CHECK(), BaselineJoinHashTable::condition_, DEBUG_TIMER, BaselineJoinHashTable::device_count_, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, g_overlaps_max_table_size_bytes, get_inner_query_info(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), InputTableInfo::info, BaselineJoinHashTable::layout_, JoinHashTableInterface::OneToMany, only_shards_for_device(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::query_infos_, BaselineJoinHashTable::reifyForDevice(), BaselineJoinHashTable::shardCount(), logger::thread_id(), and VLOG.

89  {
90  auto timer = DEBUG_TIMER(__func__);
92  layout_ = layout;
93  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
94  if (query_info.fragments.empty()) {
95  return;
96  }
97  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
98  const auto shard_count = shardCount();
99 
100  // Prepare to calculate the size of the hash table.
102  calculateCounts(shard_count,
103  query_info,
104  columns_per_device); // called only to populate columns_per_device
105  const auto composite_key_info = getCompositeKeyInfo();
106  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
107  composite_key_info.cache_key_chunks,
108  condition_->get_optype()};
109  columns_per_device.clear();
111 
112  // Auto-tuner: Pre-calculate some possible hash table sizes.
113  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
114  auto atc = auto_tuner_cache_.find(cache_key);
115  if (atc != auto_tuner_cache_.end()) {
117  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
119  } else {
120  VLOG(1) << "Auto tuning for the overlaps hash table size:";
121  const double min_threshold{0.00001};
122  const double max_threshold{0.1};
123  double good_threshold{max_threshold};
124  for (double threshold = max_threshold; threshold >= min_threshold;
125  threshold /= 10.0) {
127  size_t entry_count;
128  size_t emitted_keys_count;
129  std::tie(entry_count, emitted_keys_count) =
130  calculateCounts(shard_count, query_info, columns_per_device);
131  size_t hash_table_size = calculateHashTableSize(
132  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
133  columns_per_device.clear();
135  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
136  << " giving: entry count " << entry_count << " hash table size "
137  << hash_table_size;
138  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
139  good_threshold = overlaps_hashjoin_bucket_threshold_;
140  } else {
141  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
142  break;
143  }
144  }
145  overlaps_hashjoin_bucket_threshold_ = good_threshold;
147  }
148 
149  // Calculate the final size of the hash table.
150  VLOG(1) << "Accepted bin threshold of " << std::fixed
152  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
153  // from entries_per_device.
154  std::tie(entry_count_, emitted_keys_count_) =
155  calculateCounts(shard_count, query_info, columns_per_device);
156  size_t hash_table_size = calculateHashTableSize(
158  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
159  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
160  << entry_count_ << " hash table size " << hash_table_size;
161 
162  std::vector<std::future<void>> init_threads;
163  for (int device_id = 0; device_id < device_count_; ++device_id) {
164  const auto fragments =
165  shard_count
166  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
167  : query_info.fragments;
168  init_threads.push_back(std::async(std::launch::async,
170  this,
171  columns_per_device[device_id],
172  layout,
173  device_id,
174  logger::thread_id()));
175  }
176  for (auto& init_thread : init_threads) {
177  init_thread.wait();
178  }
179  for (auto& init_thread : init_threads) {
180  init_thread.get();
181  }
182 }
std::vector< double > bucket_sizes_for_dimension_
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static std::mutex auto_tuner_cache_mutex_
JoinHashTableInterface::HashType layout_
static std::map< HashTableCacheKey, double > auto_tuner_cache_
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::vector< InputTableInfo > & query_infos_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:90
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
int getInnerTableId() const noexceptoverride
ThreadId thread_id()
Definition: Logger.cpp:715
#define DEBUG_TIMER(name)
Definition: Logger.h:313
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

static auto OverlapsJoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 53 of file OverlapsJoinHashTable.h.

References auto_tuner_cache_, and auto_tuner_cache_mutex_.

53  {
54  return []() -> void {
55  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
56  auto_tuner_cache_.clear();
57  };
58  }
static std::mutex auto_tuner_cache_mutex_
static std::map< HashTableCacheKey, double > auto_tuner_cache_

Member Data Documentation

std::map< OverlapsJoinHashTable::HashTableCacheKey, double > OverlapsJoinHashTable::auto_tuner_cache_
staticprotected

Definition at line 99 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::mutex OverlapsJoinHashTable::auto_tuner_cache_mutex_
staticprotected

Definition at line 100 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::vector<double> OverlapsJoinHashTable::bucket_sizes_for_dimension_
private
double OverlapsJoinHashTable::overlaps_hashjoin_bucket_threshold_
private

The documentation for this class was generated from the following files: