23 #ifndef QUERYENGINE_GROUPBYFASTIMPL_H
24 #define QUERYENGINE_GROUPBYFASTIMPL_H
28 #include "../../../Shared/funcannotations.h"
29 #include "../../../Shared/shard_key.h"
32 #define insert_key_cas(address, compare, val) atomicCAS(address, compare, val)
35 #define insert_key_cas(address, compare, val) \
36 InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
37 static_cast<long>(val), \
38 static_cast<long>(compare))
40 #define insert_key_cas(address, compare, val) \
41 __sync_val_compare_and_swap(address, compare, val)
47 const int32_t invalid_slot_val) {
48 if (
insert_key_cas(entry_ptr, invalid_slot_val, idx) != invalid_slot_val) {
57 const int32_t invalid_slot_val) {
69 const int64_t min_key,
70 const int64_t bucket_normalization) {
71 return buff + (key - min_key) / bucket_normalization;
76 const int64_t min_key) {
77 return buff + (key - min_key);
83 const int64_t min_key,
84 const uint32_t entry_count_per_shard,
85 const uint32_t num_shards,
86 const uint32_t device_count,
87 const int64_t bucket_normalization) {
89 const uint32_t shard_buffer_index =
91 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
92 return shard_buffer + (key - min_key) / bucket_normalization / num_shards;
98 const int64_t min_key,
99 const uint32_t entry_count_per_shard,
100 const uint32_t num_shards,
101 const uint32_t device_count) {
103 const uint32_t shard_buffer_index =
104 shard / device_count;
105 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
106 return shard_buffer + (key - min_key) / num_shards;
112 const int64_t min_key,
113 const uint32_t entry_count_per_shard,
114 const uint32_t shard,
115 const uint32_t num_shards,
116 const uint32_t device_count,
117 const int64_t bucket_normalization) {
118 const uint32_t shard_buffer_index =
119 shard / device_count;
120 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
121 return shard_buffer + (key - min_key) / bucket_normalization / num_shards;
127 const int64_t min_key,
128 const uint32_t entry_count_per_shard,
129 const uint32_t shard,
130 const uint32_t num_shards,
131 const uint32_t device_count) {
132 const uint32_t shard_buffer_index =
133 shard / device_count;
134 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
135 return shard_buffer + (key - min_key) / num_shards;
138 #endif // QUERYENGINE_GROUPBYFASTIMPL_H
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
#define insert_key_cas(address, compare, val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
#define SHARD_FOR_KEY(key, num_shards)