23 #ifndef QUERYENGINE_GROUPBYFASTIMPL_H
24 #define QUERYENGINE_GROUPBYFASTIMPL_H
28 #include "../../../Shared/funcannotations.h"
29 #include "../../../Shared/shard_key.h"
32 #define insert_key_cas(address, compare, val) atomicCAS(address, compare, val)
35 #define insert_key_cas(address, compare, val) \
36 InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
37 static_cast<long>(val), \
38 static_cast<long>(compare))
40 #define insert_key_cas(address, compare, val) \
41 __sync_val_compare_and_swap(address, compare, val)
47 const int32_t invalid_slot_val) {
48 if (
insert_key_cas(entry_ptr, invalid_slot_val, idx) != invalid_slot_val) {
57 const int32_t invalid_slot_val) {
69 const int64_t min_key,
70 const int64_t translated_null_val,
71 const int64_t bucket_normalization) {
72 auto hash_slot = key / bucket_normalization - min_key + (key == translated_null_val);
73 return buff + hash_slot;
78 const int64_t min_key) {
79 return buff + (key - min_key);
85 const int64_t min_key,
86 const int64_t translated_null_val,
87 const uint32_t entry_count_per_shard,
88 const uint32_t num_shards,
89 const uint32_t device_count,
90 const int64_t bucket_normalization) {
92 const uint32_t shard_buffer_index =
94 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
95 auto hash_slot = ((key / bucket_normalization) - min_key) / num_shards +
96 (key == translated_null_val);
97 return shard_buffer + hash_slot;
103 const int64_t min_key,
104 const uint32_t entry_count_per_shard,
105 const uint32_t num_shards,
106 const uint32_t device_count) {
108 const uint32_t shard_buffer_index =
109 shard / device_count;
110 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
111 return shard_buffer + (key - min_key) / num_shards;
117 const int64_t min_key,
118 const int64_t translated_null_val,
119 const uint32_t entry_count_per_shard,
120 const uint32_t shard,
121 const uint32_t num_shards,
122 const uint32_t device_count,
123 const int64_t bucket_normalization) {
124 const uint32_t shard_buffer_index =
125 shard / device_count;
126 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
127 int64_t hash_slot = ((key / bucket_normalization) - min_key) / num_shards +
128 (key == translated_null_val);
129 return shard_buffer + hash_slot;
135 const int64_t min_key,
136 const uint32_t entry_count_per_shard,
137 const uint32_t shard,
138 const uint32_t num_shards,
139 const uint32_t device_count) {
140 const uint32_t shard_buffer_index =
141 shard / device_count;
142 int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard;
143 return shard_buffer + (key - min_key) / num_shards;
146 #endif // QUERYENGINE_GROUPBYFASTIMPL_H
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const int64_t bucket_normalization)
#define insert_key_cas(address, compare, val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
#define SHARD_FOR_KEY(key, num_shards)