OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable Class Reference

#include <OverlapsJoinHashTable.h>

+ Inheritance diagram for OverlapsJoinHashTable:
+ Collaboration diagram for OverlapsJoinHashTable:

Public Member Functions

 OverlapsJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
 ~OverlapsJoinHashTable () override
 
- Public Member Functions inherited from BaselineJoinHashTable
int64_t getJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
size_t getJoinHashBufferSize (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
std::string toString (const ExecutorDeviceType device_type, const int device_id, bool raw=false) const noexceptoverride
 
std::set
< DecodedJoinHashBufferEntry
decodeJoinHashBuffer (const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
 
llvm::Value * codegenSlot (const CompilationOptions &, const size_t) override
 
HashJoinMatchingSet codegenMatchingSet (const CompilationOptions &, const size_t) override
 
int getInnerTableId () const noexceptoverride
 
int getInnerTableRteIdx () const noexceptoverride
 
JoinHashTableInterface::HashType getHashType () const noexceptoverride
 
size_t offsetBufferOff () const noexceptoverride
 
size_t countBufferOff () const noexceptoverride
 
size_t payloadBufferOff () const noexceptoverride
 
virtual ~BaselineJoinHashTable ()
 
- Public Member Functions inherited from JoinHashTableInterface
virtual std::string toStringFlat64 (const ExecutorDeviceType device_type, const int device_id) const noexcept
 
virtual std::string toStringFlat32 (const ExecutorDeviceType device_type, const int device_id) const noexcept
 

Static Public Member Functions

static std::shared_ptr
< OverlapsJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr
< OverlapsJoinHashTable
getSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static auto yieldCacheInvalidator () -> std::function< void()>
 
- Static Public Member Functions inherited from BaselineJoinHashTable
static std::shared_ptr
< BaselineJoinHashTable
getInstance (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from an in-flight SQL query's parse tree etc. More...
 
static std::shared_ptr
< BaselineJoinHashTable
getSyntheticInstance (std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
 Make hash table from named tables and columns (such as for testing). More...
 
static size_t getShardCountForCondition (const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
static auto yieldCacheInvalidator () -> std::function< void()>
 

Protected Member Functions

void reifyWithLayout (const int device_count, const JoinHashTableInterface::HashType layout) override
 
std::pair< size_t, size_t > calculateCounts (size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, const int device_count, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
 
size_t calculateHashTableSize (size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
 
ColumnsForDevice fetchColumnsForDevice (const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id) override
 
std::pair< size_t, size_t > approximateTupleCount (const std::vector< ColumnsForDevice > &) const override
 
size_t getKeyComponentWidth () const override
 
size_t getKeyComponentCount () const override
 
int initHashTableOnCpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout) override
 
int initHashTableOnGpu (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id) override
 
llvm::Value * codegenKey (const CompilationOptions &) override
 
- Protected Member Functions inherited from BaselineJoinHashTable
 BaselineJoinHashTable (const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
 
std::pair< const int8_t *, size_t > getAllColumnFragments (const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
size_t shardCount () const
 
Data_Namespace::MemoryLevel getEffectiveMemoryLevel (const std::vector< InnerOuter > &inner_outer_pairs) const
 
CompositeKeyInfo getCompositeKeyInfo () const
 
void reify (const int device_count)
 
JoinColumn fetchColumn (const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
 
void reifyForDevice (const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
 
void checkHashJoinReplicationConstraint (const int table_id) const
 
int initHashTableForDevice (const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
 
llvm::Value * hashPtr (const size_t index)
 
void initHashTableOnCpuFromCache (const HashTableCacheKey &)
 
void putHashTableOnCpuToCache (const HashTableCacheKey &)
 
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache (const HashTableCacheKey &) const
 
bool isBitwiseEq () const
 
void freeHashBufferMemory ()
 
void freeHashBufferGpuMemory ()
 
void freeHashBufferCpuMemory ()
 
const HashTableCacheValuefindHashTableOnCpuInCache (const HashTableCacheKey &)
 

Static Protected Attributes

static std::map
< HashTableCacheKey, double > 
auto_tuner_cache_
 
static std::mutex auto_tuner_cache_mutex_
 
- Static Protected Attributes inherited from BaselineJoinHashTable
static std::vector< std::pair
< HashTableCacheKey,
HashTableCacheValue > > 
hash_table_cache_
 
static std::mutex hash_table_cache_mutex_
 
static const int ERR_FAILED_TO_FETCH_COLUMN {-3}
 
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN {-4}
 

Private Member Functions

void computeBucketSizes (std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const std::vector< InnerOuter > &inner_outer_pairs)
 

Private Attributes

std::vector< double > bucket_sizes_for_dimension_
 
double overlaps_hashjoin_bucket_threshold_
 

Additional Inherited Members

- Public Types inherited from JoinHashTableInterface
enum  HashType { HashType::OneToOne, HashType::OneToMany }
 
- Protected Types inherited from BaselineJoinHashTable
typedef std::pair< const
int8_t *, size_t > 
LinearizedColumn
 
typedef std::pair< int, int > LinearizedColumnCacheKey
 
- Static Protected Member Functions inherited from BaselineJoinHashTable
static int getInnerTableId (const std::vector< InnerOuter > &inner_outer_pairs)
 
- Protected Attributes inherited from BaselineJoinHashTable
const std::shared_ptr
< Analyzer::BinOper
condition_
 
const std::vector
< InputTableInfo > & 
query_infos_
 
const Data_Namespace::MemoryLevel memory_level_
 
JoinHashTableInterface::HashType layout_
 
size_t entry_count_
 
size_t emitted_keys_count_
 
Executorexecutor_
 
ColumnCacheMapcolumn_cache_
 
std::shared_ptr< std::vector
< int8_t > > 
cpu_hash_table_buff_
 
std::mutex cpu_hash_table_buff_mutex_
 
std::map
< LinearizedColumnCacheKey,
LinearizedColumn
linearized_multifrag_columns_
 
std::mutex linearized_multifrag_column_mutex_
 
RowSetMemoryOwner linearized_multifrag_column_owner_
 
std::vector< InnerOuterinner_outer_pairs_
 
const Catalog_Namespace::Catalogcatalog_
 

Detailed Description

Definition at line 22 of file OverlapsJoinHashTable.h.

Constructor & Destructor Documentation

OverlapsJoinHashTable::OverlapsJoinHashTable ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const size_t  entry_count,
ColumnCacheMap column_cache,
Executor executor,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
inline

Definition at line 24 of file OverlapsJoinHashTable.h.

31  : BaselineJoinHashTable(condition,
32  query_infos,
33  memory_level,
35  entry_count,
36  column_cache,
37  executor,
38  inner_outer_pairs) {}
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
OverlapsJoinHashTable::~OverlapsJoinHashTable ( )
inlineoverride

Definition at line 40 of file OverlapsJoinHashTable.h.

40 {}

Member Function Documentation

std::pair< size_t, size_t > OverlapsJoinHashTable::approximateTupleCount ( const std::vector< ColumnsForDevice > &  columns_per_device) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 321 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), approximate_distinct_tuples_on_device_overlaps(), approximate_distinct_tuples_overlaps(), Bitmap, CHECK(), CHECK_EQ, CHECK_GT, BaselineJoinHashTable::condition_, copy_from_gpu(), copy_to_gpu(), CPU, Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getApproximateTupleCountFromCache(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getEffectiveMemoryLevel(), GPU, Data_Namespace::GPU_LEVEL, hll_size(), hll_unify(), BaselineJoinHashTable::inner_outer_pairs_, overlaps_hashjoin_bucket_threshold_, transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), UNREACHABLE, and VLOG.

Referenced by calculateCounts().

322  {
323  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
324  CountDistinctDescriptor count_distinct_desc{
326  0,
327  11,
328  true,
329  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
332  1};
333  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
334 
335  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
336  // Number of keys must match dimension of buckets
337  CHECK_EQ(columns_per_device.front().join_columns.size(),
338  columns_per_device.front().join_buckets.size());
339  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
340  const auto composite_key_info = getCompositeKeyInfo();
341  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
342  composite_key_info.cache_key_chunks,
343  condition_->get_optype(),
345  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
346  if (cached_count_info.first >= 0) {
347  VLOG(1) << "Using a cached tuple count: " << cached_count_info.first
348  << ", emitted keys count: " << cached_count_info.second;
349  return std::make_pair(cached_count_info.first, cached_count_info.second);
350  }
351  int thread_count = cpu_threads();
352  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
353  auto hll_result = &hll_buffer_all_cpus[0];
354 
355  std::vector<int32_t> num_keys_for_row;
356  // TODO(adb): support multi-column overlaps join
357  CHECK_EQ(columns_per_device.size(), 1u);
358  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
359 
361  num_keys_for_row,
362  count_distinct_desc.bitmap_sz_bits,
363  padded_size_bytes,
364  columns_per_device.front().join_columns,
365  columns_per_device.front().join_column_types,
366  columns_per_device.front().join_buckets,
367  thread_count);
368  for (int i = 1; i < thread_count; ++i) {
369  hll_unify(hll_result,
370  hll_result + i * padded_size_bytes,
371  1 << count_distinct_desc.bitmap_sz_bits);
372  }
373  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
374  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
375  }
376 #ifdef HAVE_CUDA
377  const int device_count = columns_per_device.size();
378  auto& data_mgr = executor_->getCatalog()->getDataMgr();
379  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count);
380  for (auto& host_hll_buffer : host_hll_buffers) {
381  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
382  }
383  std::vector<size_t> emitted_keys_count_device_threads(device_count, 0);
384  std::vector<std::future<void>> approximate_distinct_device_threads;
385  for (int device_id = 0; device_id < device_count; ++device_id) {
386  approximate_distinct_device_threads.emplace_back(std::async(
387  std::launch::async,
388  [device_id,
389  &columns_per_device,
390  &count_distinct_desc,
391  &data_mgr,
392  &host_hll_buffers,
393  &emitted_keys_count_device_threads,
394  this] {
395  ThrustAllocator allocator(&data_mgr, device_id);
396  auto device_hll_buffer =
397  allocator.allocateScopedBuffer(count_distinct_desc.bitmapPaddedSizeBytes());
398  data_mgr.getCudaMgr()->zeroDeviceMem(
399  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
400  const auto& columns_for_device = columns_per_device[device_id];
401  auto join_columns_gpu =
402  transfer_pod_vector_to_gpu(columns_for_device.join_columns, allocator);
403 
404  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
405  const auto& bucket_sizes_for_dimension =
406  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
407  auto bucket_sizes_gpu = allocator.allocateScopedBuffer(
408  bucket_sizes_for_dimension.size() * sizeof(double));
409  copy_to_gpu(&data_mgr,
410  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
411  bucket_sizes_for_dimension.data(),
412  bucket_sizes_for_dimension.size() * sizeof(double),
413  device_id);
414  const size_t row_counts_buffer_sz =
415  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
416  auto row_counts_buffer = allocator.allocateScopedBuffer(row_counts_buffer_sz);
417  data_mgr.getCudaMgr()->zeroDeviceMem(
418  row_counts_buffer, row_counts_buffer_sz, device_id);
419  const auto key_handler =
420  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
421  join_columns_gpu,
422  reinterpret_cast<double*>(bucket_sizes_gpu));
423  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
425  reinterpret_cast<uint8_t*>(device_hll_buffer),
426  count_distinct_desc.bitmap_sz_bits,
427  reinterpret_cast<int32_t*>(row_counts_buffer),
428  key_handler_gpu,
429  columns_for_device.join_columns[0].num_elems,
430  executor_->blockSize(),
431  executor_->gridSize());
432 
433  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
434  copy_from_gpu(&data_mgr,
435  &host_emitted_keys_count,
436  reinterpret_cast<CUdeviceptr>(
437  row_counts_buffer +
438  (columns_per_device.front().join_columns[0].num_elems - 1) *
439  sizeof(int32_t)),
440  sizeof(int32_t),
441  device_id);
442 
443  auto& host_hll_buffer = host_hll_buffers[device_id];
444  copy_from_gpu(&data_mgr,
445  &host_hll_buffer[0],
446  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
447  count_distinct_desc.bitmapPaddedSizeBytes(),
448  device_id);
449  }));
450  }
451  for (auto& child : approximate_distinct_device_threads) {
452  child.get();
453  }
454  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
455  auto& result_hll_buffer = host_hll_buffers.front();
456  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
457  for (int device_id = 1; device_id < device_count; ++device_id) {
458  auto& host_hll_buffer = host_hll_buffers[device_id];
459  hll_unify(hll_result,
460  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
461  1 << count_distinct_desc.bitmap_sz_bits);
462  }
463  size_t emitted_keys_count = 0;
464  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
465  emitted_keys_count += emitted_keys_count_device;
466  }
467  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
468  emitted_keys_count);
469 #else
470  UNREACHABLE();
471  return {0, 0};
472 #endif // HAVE_CUDA
473 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
#define UNREACHABLE()
Definition: Logger.h:234
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::vector< InnerOuter > inner_outer_pairs_
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
CompositeKeyInfo getCompositeKeyInfo() const
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:280
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > OverlapsJoinHashTable::calculateCounts ( size_t  shard_count,
const Fragmenter_Namespace::TableInfo query_info,
const int  device_count,
std::vector< BaselineJoinHashTable::ColumnsForDevice > &  columns_per_device 
)
protected

Definition at line 242 of file OverlapsJoinHashTable.cpp.

References approximateTupleCount(), fetchColumnsForDevice(), Fragmenter_Namespace::TableInfo::fragments, get_entries_per_device(), BaselineJoinHashTable::memory_level_, and only_shards_for_device().

Referenced by reifyWithLayout().

246  {
247  for (int device_id = 0; device_id < device_count; ++device_id) {
248  const auto fragments =
249  shard_count
250  ? only_shards_for_device(query_info.fragments, device_id, device_count)
251  : query_info.fragments;
252  const auto columns_for_device = fetchColumnsForDevice(fragments, device_id);
253  columns_per_device.push_back(columns_for_device);
254  }
255 
256  size_t tuple_count;
257  size_t emitted_keys_count;
258  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
259  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
260 
261  return std::make_pair(
262  get_entries_per_device(entry_count, shard_count, device_count, memory_level_),
263  emitted_keys_count);
264 }
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id) override
const Data_Namespace::MemoryLevel memory_level_
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::calculateHashTableSize ( size_t  number_of_dimensions,
size_t  emitted_keys_count,
size_t  entry_count 
) const
protected

Definition at line 266 of file OverlapsJoinHashTable.cpp.

References getKeyComponentWidth().

Referenced by initHashTableOnCpu(), and reifyWithLayout().

268  {
269  const auto key_component_width = getKeyComponentWidth();
270  const auto key_component_count = number_of_dimensions;
271  const auto entry_size = key_component_count * key_component_width;
272  const auto keys_for_all_rows = emitted_keys_count;
273  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
274  const size_t hash_table_size =
275  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
276  return hash_table_size;
277 }
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

llvm::Value * OverlapsJoinHashTable::codegenKey ( const CompilationOptions co)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 766 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), CHECK_EQ, CodeGenerator::codegen(), BaselineJoinHashTable::executor_, get_int_type(), getKeyComponentCount(), getKeyComponentWidth(), BaselineJoinHashTable::inner_outer_pairs_, kENCODING_GEOINT, kPOINT, kTINYINT, LL_BUILDER, LL_CONTEXT, LL_FP, LL_INT, and CodeGenerator::posArg().

766  {
767  const auto key_component_width = getKeyComponentWidth();
768  CHECK(key_component_width == 4 || key_component_width == 8);
769  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
770  llvm::Value* key_buff_lv{nullptr};
771  switch (key_component_width) {
772  case 4:
773  key_buff_lv =
774  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
775  break;
776  case 8:
777  key_buff_lv =
778  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
779  break;
780  default:
781  CHECK(false);
782  }
783 
784  const auto& inner_outer_pair = inner_outer_pairs_[0];
785  const auto outer_col = inner_outer_pair.second;
786  const auto outer_col_ti = outer_col->get_type_info();
787 
788  if (outer_col_ti.is_geometry()) {
789  CodeGenerator code_generator(executor_);
790  // TODO(adb): for points we will use the coords array, but for other geometries we
791  // will need to use the bounding box. For now only support points.
792  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
793  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
794 
795  const auto col_lvs = code_generator.codegen(outer_col, true, co);
796  CHECK_EQ(col_lvs.size(), size_t(1));
797 
798  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
799  CHECK(outer_col_var);
800  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
801  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
802  CHECK(coords_cd);
803 
804  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
805  "array_buff",
806  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
807  {col_lvs.front(), code_generator.posArg(outer_col)});
808  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
809  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
810  const auto arr_ptr =
811  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
812 
813  for (size_t i = 0; i < 2; i++) {
814  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
815 
816  // Note that get_bucket_key_for_range_compressed will need to be specialized for
817  // future compression schemes
818  auto bucket_key =
819  outer_col_ti.get_compression() == kENCODING_GEOINT
820  ? executor_->cgen_state_->emitExternalCall(
821  "get_bucket_key_for_range_compressed",
823  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
824  : executor_->cgen_state_->emitExternalCall(
825  "get_bucket_key_for_range_double",
827  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
828  const auto col_lv = LL_BUILDER.CreateSExt(
829  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
830  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
831  }
832  } else {
833  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
834  }
835  return key_buff_lv;
836 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< double > bucket_sizes_for_dimension_
#define LOG(tag)
Definition: Logger.h:185
#define LL_FP(v)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
#define LL_BUILDER
#define LL_INT(v)
size_t getKeyComponentCount() const override
#define LL_CONTEXT
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

void OverlapsJoinHashTable::computeBucketSizes ( std::vector< double > &  bucket_sizes_for_dimension,
const JoinColumn join_column,
const std::vector< InnerOuter > &  inner_outer_pairs 
)
private

Definition at line 838 of file OverlapsJoinHashTable.cpp.

References CHECK(), CHECK_EQ, CHECK_GT, compute_bucket_sizes(), compute_bucket_sizes_on_device(), copy_from_gpu(), Data_Namespace::CPU_LEVEL, cpu_threads(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::getEffectiveMemoryLevel(), overlaps_hashjoin_bucket_threshold_, to_string(), transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), and VLOG.

Referenced by fetchColumnsForDevice().

841  {
842  // No coalesced keys for overlaps joins yet
843  CHECK_EQ(inner_outer_pairs.size(), 1u);
844 
845  const auto col = inner_outer_pairs[0].first;
846  CHECK(col);
847  const auto col_ti = col->get_type_info();
848  CHECK(col_ti.is_array());
849 
850  // Compute the number of dimensions for this overlaps key
851  int num_dims{-1};
852  if (col_ti.is_fixlen_array()) {
853  num_dims = col_ti.get_size() / col_ti.get_elem_type().get_size();
854  num_dims /= 2;
855  } else {
856  CHECK(col_ti.is_varlen_array());
857  num_dims = 2;
858  // TODO(adb): how can we pick the number of dims in the varlen case? e.g.
859  // backwards compatibility with existing bounds cols or generic range joins
860  }
861  CHECK_GT(num_dims, 0);
862  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
863 
864  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
866 
867  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
868  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
869  const int thread_count = cpu_threads();
870  compute_bucket_sizes(local_bucket_sizes,
871  join_column,
873  thread_count);
874  }
875 #ifdef HAVE_CUDA
876  else {
877  // Note that we compute the bucket sizes using only a single GPU
878  const int device_id = 0;
879  auto& data_mgr = executor_->getCatalog()->getDataMgr();
880  ThrustAllocator allocator(&data_mgr, device_id);
881  auto device_bucket_sizes_gpu =
882  transfer_pod_vector_to_gpu(local_bucket_sizes, allocator);
883  auto join_columns_gpu = transfer_object_to_gpu(join_column, allocator);
884 
885  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
886  join_columns_gpu,
888  executor_->blockSize(),
889  executor_->gridSize());
890  copy_from_gpu(&data_mgr,
891  local_bucket_sizes.data(),
892  reinterpret_cast<CUdeviceptr>(device_bucket_sizes_gpu),
893  local_bucket_sizes.size() * sizeof(double),
894  device_id);
895  }
896 #endif
897 
898  size_t ctr = 0;
899  for (auto& bucket_sz : local_bucket_sizes) {
900  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
901  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
902  }
903 
904  return;
905 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
unsigned long long CUdeviceptr
Definition: nocuda.h:27
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const double bucket_size_threshold, const int thread_count)
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::string to_string(char const *&&v)
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column_for_key, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:280

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

BaselineJoinHashTable::ColumnsForDevice OverlapsJoinHashTable::fetchColumnsForDevice ( const std::deque< Fragmenter_Namespace::FragmentInfo > &  fragments,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 279 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_, CHECK(), JoinColumn::col_buff, computeBucketSizes(), BaselineJoinHashTable::executor_, BaselineJoinHashTable::fetchColumn(), get_column_descriptor_maybe(), get_join_column_type_kind(), BaselineJoinHashTable::getEffectiveMemoryLevel(), BaselineJoinHashTable::inner_outer_pairs_, BaselineJoinHashTable::isBitwiseEq(), and kDOUBLE.

Referenced by calculateCounts().

281  {
282  const auto& catalog = *executor_->getCatalog();
283  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
284 
285  std::vector<JoinColumn> join_columns;
286  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
287  std::vector<JoinColumnTypeInfo> join_column_types;
288  std::vector<JoinBucketInfo> join_bucket_info;
289  for (const auto& inner_outer_pair : inner_outer_pairs_) {
290  const auto inner_col = inner_outer_pair.first;
291  const auto inner_cd = get_column_descriptor_maybe(
292  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
293  if (inner_cd && inner_cd->isVirtualCol) {
295  }
296  const auto join_column_info = fetchColumn(
297  inner_col, effective_memory_level, fragments, chunks_owner, device_id);
298  join_columns.emplace_back(
299  JoinColumn{join_column_info.col_buff, join_column_info.num_elems});
300  const auto& ti = inner_col->get_type_info();
301  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
302  0,
303  inline_int_null_value<int64_t>(),
304  isBitwiseEq(),
305  0,
307  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
308 
309  if (bucket_sizes_for_dimension_.empty()) {
311  bucket_sizes_for_dimension_, join_columns.back(), inner_outer_pairs_);
312  }
313  const auto elem_ti = ti.get_elem_type();
314  CHECK(elem_ti.is_fp());
315  join_bucket_info.emplace_back(
316  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
317  }
318  return {join_columns, join_column_types, chunks_owner, join_bucket_info};
319 }
std::vector< double > bucket_sizes_for_dimension_
const int8_t * col_buff
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< InnerOuter > inner_outer_pairs_
CHECK(cgen_state)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
JoinColumn fetchColumn(const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const std::vector< InnerOuter > &inner_outer_pairs)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< OverlapsJoinHashTable > OverlapsJoinHashTable::getInstance ( const std::shared_ptr< Analyzer::BinOper condition,
const std::vector< InputTableInfo > &  query_infos,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from an in-flight SQL query's parse tree etc.

Definition at line 31 of file OverlapsJoinHashTable.cpp.

References logger::FATAL, get_entries_per_device(), get_inner_query_info(), BaselineJoinHashTable::getInnerTableId(), Fragmenter_Namespace::TableInfo::getNumTuplesUpperBound(), BaselineJoinHashTable::getShardCountForCondition(), Data_Namespace::GPU_LEVEL, InputTableInfo::info, LOG, and normalize_column_pairs().

Referenced by Executor::buildHashTableForQualifier(), and getSyntheticInstance().

37  {
38  auto inner_outer_pairs = normalize_column_pairs(
39  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
40  const auto& query_info =
41  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
42  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
43  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
44  throw TooManyHashEntries();
45  }
46  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
48  condition.get(), executor, inner_outer_pairs)
49  : 0;
50  const auto entries_per_device =
51  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
52  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
53  query_infos,
54  memory_level,
55  entries_per_device,
56  column_cache,
57  executor,
58  inner_outer_pairs);
59  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
60  try {
61  join_hash_table->reify(device_count);
62  } catch (const HashJoinFail& e) {
63  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
64  "involved in equijoin | ") +
65  e.what());
66  } catch (const ColumnarConversionNotSupported& e) {
67  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
68  e.what());
69  } catch (const std::exception& e) {
70  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
71  << e.what();
72  }
73  return join_hash_table;
74 }
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
#define LOG(tag)
Definition: Logger.h:185
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
int getInnerTableId() const noexceptoverride
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentCount ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 479 of file OverlapsJoinHashTable.cpp.

References bucket_sizes_for_dimension_.

Referenced by codegenKey().

479  {
480  return bucket_sizes_for_dimension_.size();
481 }
std::vector< double > bucket_sizes_for_dimension_

+ Here is the caller graph for this function:

size_t OverlapsJoinHashTable::getKeyComponentWidth ( ) const
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 475 of file OverlapsJoinHashTable.cpp.

Referenced by calculateHashTableSize(), codegenKey(), and initHashTableOnCpu().

475  {
476  return 8;
477 }

+ Here is the caller graph for this function:

std::shared_ptr< OverlapsJoinHashTable > OverlapsJoinHashTable::getSyntheticInstance ( std::string_view  table1,
std::string_view  column1,
std::string_view  table2,
std::string_view  column2,
const Data_Namespace::MemoryLevel  memory_level,
const int  device_count,
ColumnCacheMap column_cache,
Executor executor 
)
static

Make hash table from named tables and columns (such as for testing).

Definition at line 77 of file OverlapsJoinHashTable.cpp.

References CHECK(), getInstance(), kBOOLEAN, kLINESTRING, kMULTIPOLYGON, kONE, kOVERLAPS, and kPOLYGON.

85  {
86  auto catalog = executor->getCatalog();
87  CHECK(catalog);
88 
89  auto tmeta1 = catalog->getMetadataForTable(std::string(table1));
90  auto tmeta2 = catalog->getMetadataForTable(std::string(table2));
91 
92  CHECK(tmeta1);
93  CHECK(tmeta2);
94 
95  auto cmeta1 = catalog->getMetadataForColumn(tmeta1->tableId, std::string(column1));
96  auto cmeta2 = catalog->getMetadataForColumn(tmeta2->tableId, std::string(column2));
97 
98  CHECK(cmeta1);
99  CHECK(cmeta2);
100 
101  auto ti1 = cmeta1->columnType;
102  auto ti2 = cmeta2->columnType;
103 
104  CHECK(ti1.is_geometry());
105  CHECK(ti2.is_geometry());
106 
107  int targetColumnId = 0;
108  switch (ti2.get_type()) {
109  case kLINESTRING: {
110  targetColumnId = cmeta2->columnId + 2;
111  break;
112  }
113  case kPOLYGON: {
114  targetColumnId = cmeta2->columnId + 3;
115  break;
116  }
117  case kMULTIPOLYGON: {
118  targetColumnId = cmeta2->columnId + 4;
119  break;
120  }
121  default:
122  CHECK(false);
123  }
124 
125  auto cmeta3 = catalog->getMetadataForColumn(tmeta2->tableId, targetColumnId);
126  CHECK(cmeta3);
127  auto ti3 = cmeta3->columnType;
128 
129  auto a1 = std::make_shared<Analyzer::ColumnVar>(ti1, tmeta1->tableId, 1, 0);
130  auto a2 =
131  std::make_shared<Analyzer::ColumnVar>(ti3, tmeta2->tableId, cmeta3->columnId, 1);
132 
133  auto op = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kOVERLAPS, kONE, a1, a2);
134 
135  size_t number_of_join_tables{2};
136  std::vector<InputTableInfo> query_infos(number_of_join_tables);
137  query_infos[0].table_id = tmeta1->tableId;
138  query_infos[0].info = tmeta1->fragmenter->getFragmentsForQuery();
139  query_infos[1].table_id = tmeta2->tableId;
140  query_infos[1].info = tmeta2->fragmenter->getFragmentsForQuery();
141 
143  op, query_infos, memory_level, device_count, column_cache, executor);
144 }
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
CHECK(cgen_state)
Definition: sqldefs.h:69

+ Here is the call graph for this function:

int OverlapsJoinHashTable::initHashTableOnCpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 483 of file OverlapsJoinHashTable.cpp.

References calculateHashTableSize(), CHECK(), BaselineJoinHashTable::condition_, BaselineJoinHashTable::cpu_hash_table_buff_, cpu_threads(), BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, fill_one_to_many_baseline_hash_table_32(), fill_one_to_many_baseline_hash_table_64(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), getKeyComponentWidth(), init_baseline_hash_join_buff_32(), init_baseline_hash_join_buff_64(), init_hash_join_buff(), BaselineJoinHashTable::initHashTableOnCpuFromCache(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_32(), overlaps_fill_baseline_hash_join_buff_64(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::putHashTableOnCpuToCache(), and VLOG.

487  {
488  const auto composite_key_info = getCompositeKeyInfo();
489  CHECK(!join_columns.empty());
490  CHECK(!join_bucket_info.empty());
491  HashTableCacheKey cache_key{join_columns.front().num_elems,
492  composite_key_info.cache_key_chunks,
493  condition_->get_optype(),
495  initHashTableOnCpuFromCache(cache_key);
496  if (cpu_hash_table_buff_) {
497  return 0;
498  }
500  const auto key_component_width = getKeyComponentWidth();
501  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
502  const auto entry_size = key_component_count * key_component_width;
503  const auto keys_for_all_rows = emitted_keys_count_;
504  const size_t one_to_many_hash_entries = 2 * entry_count_ + keys_for_all_rows;
505  const size_t hash_table_size =
506  calculateHashTableSize(join_bucket_info[0].bucket_sizes_for_dimension.size(),
508  entry_count_);
509 
510  VLOG(1) << "Initializing CPU Overlaps Join Hash Table with " << entry_count_
511  << " hash entries and " << one_to_many_hash_entries
512  << " entries in the one to many buffer";
513  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
514 
515  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
516  int thread_count = cpu_threads();
517  std::vector<std::future<void>> init_cpu_buff_threads;
518  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
519  init_cpu_buff_threads.emplace_back(std::async(
520  std::launch::async,
521  [this, key_component_count, key_component_width, thread_idx, thread_count] {
522  switch (key_component_width) {
523  case 4:
525  entry_count_,
526  key_component_count,
527  false,
528  -1,
529  thread_idx,
530  thread_count);
531  break;
532  case 8:
534  entry_count_,
535  key_component_count,
536  false,
537  -1,
538  thread_idx,
539  thread_count);
540  break;
541  default:
542  CHECK(false);
543  }
544  }));
545  }
546  for (auto& child : init_cpu_buff_threads) {
547  child.get();
548  }
549  std::vector<std::future<int>> fill_cpu_buff_threads;
550  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
551  fill_cpu_buff_threads.emplace_back(std::async(
552  std::launch::async,
553  [this,
554  &join_columns,
555  &join_bucket_info,
556  key_component_count,
557  key_component_width,
558  thread_idx,
559  thread_count] {
560  switch (key_component_width) {
561  case 4: {
562  const auto key_handler = OverlapsKeyHandler(
563  key_component_count,
564  &join_columns[0],
565  join_bucket_info[0].bucket_sizes_for_dimension.data());
567  entry_count_,
568  -1,
569  key_component_count,
570  false,
571  &key_handler,
572  join_columns[0].num_elems,
573  thread_idx,
574  thread_count);
575  }
576  case 8: {
577  const auto key_handler = OverlapsKeyHandler(
578  key_component_count,
579  &join_columns[0],
580  join_bucket_info[0].bucket_sizes_for_dimension.data());
582  entry_count_,
583  -1,
584  key_component_count,
585  false,
586  &key_handler,
587  join_columns[0].num_elems,
588  thread_idx,
589  thread_count);
590  }
591  default:
592  CHECK(false);
593  }
594  return -1;
595  }));
596  }
597  int err = 0;
598  for (auto& child : fill_cpu_buff_threads) {
599  int partial_err = child.get();
600  if (partial_err) {
601  err = partial_err;
602  }
603  }
604  if (err) {
605  cpu_hash_table_buff_.reset();
606  return err;
607  }
608  auto one_to_many_buff =
609  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size);
610  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
611  switch (key_component_width) {
612  case 4: {
613  const auto composite_key_dict =
614  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
616  composite_key_dict,
617  entry_count_,
618  -1,
619  key_component_count,
620  join_columns,
621  join_column_types,
622  join_bucket_info,
623  composite_key_info.sd_inner_proxy_per_key,
624  composite_key_info.sd_outer_proxy_per_key,
625  thread_count);
626  break;
627  }
628  case 8: {
629  const auto composite_key_dict =
630  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
632  composite_key_dict,
633  entry_count_,
634  -1,
635  key_component_count,
636  join_columns,
637  join_column_types,
638  join_bucket_info,
639  composite_key_info.sd_inner_proxy_per_key,
640  composite_key_info.sd_outer_proxy_per_key,
641  thread_count);
642  break;
643  }
644  default:
645  CHECK(false);
646  }
647  if (!err && getInnerTableId() > 0) {
648  putHashTableOnCpuToCache(cache_key);
649  }
650  return err;
651 }
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
void putHashTableOnCpuToCache(const HashTableCacheKey &)
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int getInnerTableId() const noexceptoverride
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
int cpu_threads()
Definition: thread_count.h:25
#define VLOG(n)
Definition: Logger.h:280
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getKeyComponentWidth() const override

+ Here is the call graph for this function:

int OverlapsJoinHashTable::initHashTableOnGpu ( const std::vector< JoinColumn > &  join_columns,
const std::vector< JoinColumnTypeInfo > &  join_column_types,
const std::vector< JoinBucketInfo > &  join_bucket_info,
const JoinHashTableInterface::HashType  layout,
const size_t  key_component_width,
const size_t  key_component_count,
const int  device_id 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 653 of file OverlapsJoinHashTable.cpp.

References ThrustAllocator::allocateScopedBuffer(), CHECK(), CHECK_EQ, copy_from_gpu(), copy_to_gpu(), BaselineJoinHashTable::entry_count_, BaselineJoinHashTable::executor_, init_baseline_hash_join_buff_on_device_32(), init_baseline_hash_join_buff_on_device_64(), init_hash_join_buff_on_device(), JoinHashTableInterface::OneToMany, overlaps_fill_baseline_hash_join_buff_on_device_64(), overlaps_fill_one_to_many_baseline_hash_table_on_device_64(), transfer_object_to_gpu(), transfer_pod_vector_to_gpu(), and UNREACHABLE.

660  {
661  int err = 0;
662  // TODO(adb): 4 byte keys
663  CHECK_EQ(key_component_width, size_t(8));
665 #ifdef HAVE_CUDA
666  const auto catalog = executor_->getCatalog();
667  auto& data_mgr = catalog->getDataMgr();
668  ThrustAllocator allocator(&data_mgr, device_id);
669  auto dev_err_buff =
670  reinterpret_cast<CUdeviceptr>(allocator.allocateScopedBuffer(sizeof(int)));
671  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
672  switch (key_component_width) {
673  case 4:
675  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
676  entry_count_,
677  key_component_count,
678  false,
679  -1,
680  executor_->blockSize(),
681  executor_->gridSize());
682  break;
683  case 8:
685  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
686  entry_count_,
687  key_component_count,
688  false,
689  -1,
690  executor_->blockSize(),
691  executor_->gridSize());
692  break;
693  default:
694  CHECK(false);
695  }
696  auto join_columns_gpu = transfer_pod_vector_to_gpu(join_columns, allocator);
697  auto hash_buff =
698  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
699  CHECK_EQ(join_columns.size(), 1u);
700  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
701  auto bucket_sizes_gpu =
702  transfer_pod_vector_to_gpu(bucket_sizes_for_dimension, allocator);
703  const auto key_handler = OverlapsKeyHandler(
704  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
705  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
706  switch (key_component_width) {
707  case 8: {
709  hash_buff,
710  entry_count_,
711  -1,
712  key_component_count,
713  false,
714  reinterpret_cast<int*>(dev_err_buff),
715  key_handler_gpu,
716  join_columns.front().num_elems,
717  executor_->blockSize(),
718  executor_->gridSize());
719  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
720  break;
721  }
722  default:
723  UNREACHABLE();
724  }
725  if (err) {
726  return err;
727  }
728  const auto entry_size = key_component_count * key_component_width;
729  auto one_to_many_buff = reinterpret_cast<int32_t*>(
730  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
731  switch (key_component_width) {
732  case 8: {
733  const auto composite_key_dict =
734  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
735  init_hash_join_buff_on_device(one_to_many_buff,
736  entry_count_,
737  -1,
738  executor_->blockSize(),
739  executor_->gridSize());
741  one_to_many_buff,
742  composite_key_dict,
743  entry_count_,
744  -1,
745  key_handler_gpu,
746  join_columns.front().num_elems,
747  executor_->blockSize(),
748  executor_->gridSize());
749  break;
750  }
751  default:
752  UNREACHABLE();
753  }
754 #else
755  UNREACHABLE();
756 #endif
757  return err;
758 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:234
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)

+ Here is the call graph for this function:

void OverlapsJoinHashTable::reifyWithLayout ( const int  device_count,
const JoinHashTableInterface::HashType  layout 
)
overrideprotectedvirtual

Reimplemented from BaselineJoinHashTable.

Definition at line 146 of file OverlapsJoinHashTable.cpp.

References auto_tuner_cache_, auto_tuner_cache_mutex_, bucket_sizes_for_dimension_, calculateCounts(), calculateHashTableSize(), CHECK(), BaselineJoinHashTable::condition_, BaselineJoinHashTable::emitted_keys_count_, BaselineJoinHashTable::entry_count_, g_overlaps_max_table_size_bytes, get_inner_query_info(), BaselineJoinHashTable::getCompositeKeyInfo(), BaselineJoinHashTable::getInnerTableId(), InputTableInfo::info, BaselineJoinHashTable::layout_, JoinHashTableInterface::OneToMany, only_shards_for_device(), overlaps_hashjoin_bucket_threshold_, BaselineJoinHashTable::query_infos_, BaselineJoinHashTable::reifyForDevice(), BaselineJoinHashTable::shardCount(), and VLOG.

148  {
150  layout_ = layout;
151  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
152  if (query_info.fragments.empty()) {
153  return;
154  }
155  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
156  const auto shard_count = shardCount();
157 
158  // Prepare to calculate the size of the hash table.
160  calculateCounts(shard_count,
161  query_info,
162  device_count,
163  columns_per_device); // called only to populate columns_per_device
164  const auto composite_key_info = getCompositeKeyInfo();
165  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
166  composite_key_info.cache_key_chunks,
167  condition_->get_optype()};
168  columns_per_device.clear();
170 
171  // Auto-tuner: Pre-calculate some possible hash table sizes.
172  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
173  auto atc = auto_tuner_cache_.find(cache_key);
174  if (atc != auto_tuner_cache_.end()) {
176  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
178  } else {
179  VLOG(1) << "Auto tuning for the overlaps hash table size:";
180  const double min_threshold{0.00001};
181  const double max_threshold{0.1};
182  double good_threshold{max_threshold};
183  for (double threshold = max_threshold; threshold >= min_threshold;
184  threshold /= 10.0) {
186  size_t entry_count;
187  size_t emitted_keys_count;
188  std::tie(entry_count, emitted_keys_count) =
189  calculateCounts(shard_count, query_info, device_count, columns_per_device);
190  size_t hash_table_size = calculateHashTableSize(
191  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
192  columns_per_device.clear();
194  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
195  << " giving: entry count " << entry_count << " hash table size "
196  << hash_table_size;
197  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
198  good_threshold = overlaps_hashjoin_bucket_threshold_;
199  } else {
200  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
201  break;
202  }
203  }
204  overlaps_hashjoin_bucket_threshold_ = good_threshold;
206  }
207 
208  // Calculate the final size of the hash table.
209  VLOG(1) << "Accepted bin threshold of " << std::fixed
211  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
212  // from entries_per_device.
213  std::tie(entry_count_, emitted_keys_count_) =
214  calculateCounts(shard_count, query_info, device_count, columns_per_device);
215  size_t hash_table_size = calculateHashTableSize(
217  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
218  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
219  << entry_count_ << " hash table size " << hash_table_size;
220 
221  std::vector<std::future<void>> init_threads;
222  for (int device_id = 0; device_id < device_count; ++device_id) {
223  const auto fragments =
224  shard_count
225  ? only_shards_for_device(query_info.fragments, device_id, device_count)
226  : query_info.fragments;
227  init_threads.push_back(std::async(std::launch::async,
229  this,
230  columns_per_device[device_id],
231  layout,
232  device_id));
233  }
234  for (auto& init_thread : init_threads) {
235  init_thread.wait();
236  }
237  for (auto& init_thread : init_threads) {
238  init_thread.get();
239  }
240 }
std::vector< double > bucket_sizes_for_dimension_
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static std::mutex auto_tuner_cache_mutex_
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, const int device_count, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
JoinHashTableInterface::HashType layout_
static std::map< HashTableCacheKey, double > auto_tuner_cache_
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::vector< InputTableInfo > & query_infos_
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:89
int getInnerTableId() const noexceptoverride
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:280
const std::shared_ptr< Analyzer::BinOper > condition_

+ Here is the call graph for this function:

static auto OverlapsJoinHashTable::yieldCacheInvalidator ( ) -> std::function<void()>
inlinestatic

Definition at line 62 of file OverlapsJoinHashTable.h.

References auto_tuner_cache_, and auto_tuner_cache_mutex_.

62  {
63  return []() -> void {
64  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
65  auto_tuner_cache_.clear();
66  };
67  }
static std::mutex auto_tuner_cache_mutex_
static std::map< HashTableCacheKey, double > auto_tuner_cache_

Member Data Documentation

std::map< OverlapsJoinHashTable::HashTableCacheKey, double > OverlapsJoinHashTable::auto_tuner_cache_
staticprotected

Definition at line 109 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::mutex OverlapsJoinHashTable::auto_tuner_cache_mutex_
staticprotected

Definition at line 110 of file OverlapsJoinHashTable.h.

Referenced by reifyWithLayout(), and yieldCacheInvalidator().

std::vector<double> OverlapsJoinHashTable::bucket_sizes_for_dimension_
private
double OverlapsJoinHashTable::overlaps_hashjoin_bucket_threshold_
private

The documentation for this class was generated from the following files: