OmniSciDB  c07336695a
cuda_mapd_rt.cu
Go to the documentation of this file.
1 #include <cuda.h>
2 #include <float.h>
3 #include <stdint.h>
4 #include <limits>
5 #include "BufferCompaction.h"
6 #include "ExtensionFunctions.hpp"
7 #include "GpuRtConstants.h"
8 #include "HyperLogLogRank.h"
9 
10 extern "C" __device__ int32_t pos_start_impl(const int32_t* row_index_resume) {
11  return blockIdx.x * blockDim.x + threadIdx.x;
12 }
13 
14 extern "C" __device__ int32_t group_buff_idx_impl() {
15  return pos_start_impl(NULL);
16 }
17 
18 extern "C" __device__ int32_t pos_step_impl() {
19  return blockDim.x * gridDim.x;
20 }
21 
22 extern "C" __device__ int8_t thread_warp_idx(const int8_t warp_sz) {
23  return threadIdx.x % warp_sz;
24 }
25 
26 extern "C" __device__ const int64_t* init_shared_mem_nop(
27  const int64_t* groups_buffer,
28  const int32_t groups_buffer_size) {
29  return groups_buffer;
30 }
31 
32 extern "C" __device__ void write_back_nop(int64_t* dest, int64_t* src, const int32_t sz) {
33 }
34 
35 extern "C" __device__ const int64_t* init_shared_mem(const int64_t* groups_buffer,
36  const int32_t groups_buffer_size) {
37  extern __shared__ int64_t fast_bins[];
38  if (threadIdx.x == 0) {
39  memcpy(fast_bins, groups_buffer, groups_buffer_size);
40  }
41  __syncthreads();
42  return fast_bins;
43 }
44 
45 /**
46  * Dynamically allocates shared memory per block.
47  * The amount of shared memory allocated is defined at kernel launch time.
48  * Returns a pointer to the beginning of allocated shared memory
49  */
50 extern "C" __device__ int64_t* alloc_shared_mem_dynamic() {
51  extern __shared__ int64_t groups_buffer_smem[];
52  return groups_buffer_smem;
53 }
54 
55 /**
56  * Set the allocated shared memory elements to be equal to the 'identity_element'.
57  * groups_buffer_size: number of 64-bit elements in shared memory per thread-block
58  * NOTE: groups_buffer_size is in units of 64-bit elements.
59  */
60 extern "C" __device__ void set_shared_mem_to_identity(
61  int64_t* groups_buffer_smem,
62  const int32_t groups_buffer_size,
63  const int64_t identity_element = 0) {
64 #pragma unroll
65  for (int i = threadIdx.x; i < groups_buffer_size; i += blockDim.x) {
66  groups_buffer_smem[i] = identity_element;
67  }
68  __syncthreads();
69 }
70 
71 /**
72  * Initialize dynamic shared memory:
73  * 1. Allocates dynamic shared memory
74  * 2. Set every allocated element to be equal to the 'identity element', by default zero.
75  */
76 extern "C" __device__ const int64_t* init_shared_mem_dynamic(
77  const int64_t* groups_buffer,
78  const int32_t groups_buffer_size) {
79  int64_t* groups_buffer_smem = alloc_shared_mem_dynamic();
80  set_shared_mem_to_identity(groups_buffer_smem, groups_buffer_size);
81  return groups_buffer_smem;
82 }
83 
84 extern "C" __device__ void write_back(int64_t* dest, int64_t* src, const int32_t sz) {
85  __syncthreads();
86  if (threadIdx.x == 0) {
87  memcpy(dest, src, sz);
88  }
89 }
90 
91 extern "C" __device__ void write_back_smem_nop(int64_t* dest,
92  int64_t* src,
93  const int32_t sz) {}
94 
95 extern "C" __device__ void agg_from_smem_to_gmem_nop(int64_t* gmem_dest,
96  int64_t* smem_src,
97  const int32_t num_elements) {}
98 
99 /**
100  * Aggregate the result stored into shared memory back into global memory.
101  * It also writes back the stored binId, if any, back into global memory.
102  * Memory layout assumption: each 64-bit shared memory unit of data is as follows:
103  * [0..31: the stored bin ID, to be written back][32..63: the count result, to be
104  * aggregated]
105  */
106 extern "C" __device__ void agg_from_smem_to_gmem_binId_count(int64_t* gmem_dest,
107  int64_t* smem_src,
108  const int32_t num_elements) {
109  __syncthreads();
110 #pragma unroll
111  for (int i = threadIdx.x; i < num_elements; i += blockDim.x) {
112  int32_t bin_id = *reinterpret_cast<int32_t*>(smem_src + i);
113  int32_t count_result = *(reinterpret_cast<int32_t*>(smem_src + i) + 1);
114  if (count_result) { // non-zero count
115  atomicAdd(reinterpret_cast<unsigned int*>(gmem_dest + i) + 1,
116  static_cast<int32_t>(count_result));
117  // writing back the binId, only if count_result is non-zero
118  *reinterpret_cast<unsigned int*>(gmem_dest + i) = static_cast<int32_t>(bin_id);
119  }
120  }
121 }
122 
123 /**
124  * Aggregate the result stored into shared memory back into global memory.
125  * It also writes back the stored binId, if any, back into global memory.
126  * Memory layout assumption: each 64-bit shared memory unit of data is as follows:
127  * [0..31: the count result, to be aggregated][32..63: the stored bin ID, to be written
128  * back]
129  */
130 extern "C" __device__ void agg_from_smem_to_gmem_count_binId(int64_t* gmem_dest,
131  int64_t* smem_src,
132  const int32_t num_elements) {
133  __syncthreads();
134 #pragma unroll
135  for (int i = threadIdx.x; i < num_elements; i += blockDim.x) {
136  int32_t count_result = *reinterpret_cast<int32_t*>(smem_src + i);
137  int32_t bin_id = *(reinterpret_cast<int32_t*>(smem_src + i) + 1);
138  if (count_result) { // non-zero count
139  atomicAdd(reinterpret_cast<unsigned int*>(gmem_dest + i),
140  static_cast<int32_t>(count_result));
141  // writing back the binId, only if count_result is non-zero
142  *(reinterpret_cast<unsigned int*>(gmem_dest + i) + 1) =
143  static_cast<int32_t>(bin_id);
144  }
145  }
146 }
147 
148 #define init_group_by_buffer_gpu_impl init_group_by_buffer_gpu
149 
150 #include "GpuInitGroups.cu"
151 
152 #undef init_group_by_buffer_gpu_impl
153 
154 // Dynamic watchdog: monitoring up to 64 SMs. E.g. GP100 config may have 60:
155 // 6 Graphics Processing Clusters (GPCs) * 10 Streaming Multiprocessors
156 // TODO(Saman): move these into a kernel parameter, allocated and initialized through CUDA
157 __device__ int64_t dw_sm_cycle_start[128]; // Set from host before launching the kernel
158 // TODO(Saman): make this cycle budget something constant in codegen level
159 __device__ int64_t dw_cycle_budget = 0; // Set from host before launching the kernel
160 __device__ int32_t dw_abort = 0; // TBD: set from host (async)
161 
162 __inline__ __device__ uint32_t get_smid(void) {
163  uint32_t ret;
164  asm("mov.u32 %0, %%smid;" : "=r"(ret));
165  return ret;
166 }
167 
168 /*
169  * The main objective of this funciton is to return true, if any of the following two
170  * scnearios happen:
171  * 1. receives a host request for aborting the kernel execution
172  * 2. kernel execution takes longer clock cycles than it was initially allowed
173  * The assumption is that all (or none) threads within a block return true for the
174  * watchdog, and the first thread within each block compares the recorded clock cycles for
175  * its occupying SM with the allowed budget. It also assumess that all threads entering
176  * this function are active (no critical edge exposure)
177  * NOTE: dw_cycle_budget, dw_abort, and dw_sm_cycle_start[] are all variables in global
178  * memory scope.
179  */
180 extern "C" __device__ bool dynamic_watchdog() {
181  // check for dynamic watchdog, if triggered all threads return true
182  if (dw_cycle_budget == 0LL) {
183  return false; // Uninitialized watchdog can't check time
184  }
185  if (dw_abort == 1) {
186  return true; // Received host request to abort
187  }
188  uint32_t smid = get_smid();
189  if (smid >= 128) {
190  return false;
191  }
192  __shared__ volatile int64_t dw_block_cycle_start; // Thread block shared cycle start
193  __shared__ volatile bool
194  dw_should_terminate; // all threads within a block should return together if
195  // watchdog criteria is met
196 
197  // thread 0 either initializes or read the initial clock cycle, the result is stored
198  // into shared memory. Since all threads wihtin a block shares the same SM, there's no
199  // point in using more threads here.
200  if (threadIdx.x == 0) {
201  dw_block_cycle_start = 0LL;
202  int64_t cycle_count = static_cast<int64_t>(clock64());
203  // Make sure the block hasn't switched SMs
204  if (smid == get_smid()) {
205  dw_block_cycle_start = static_cast<int64_t>(
206  atomicCAS(reinterpret_cast<unsigned long long*>(&dw_sm_cycle_start[smid]),
207  0ULL,
208  static_cast<unsigned long long>(cycle_count)));
209  }
210 
211  int64_t cycles = cycle_count - dw_block_cycle_start;
212  if ((smid == get_smid()) && (dw_block_cycle_start > 0LL) &&
213  (cycles > dw_cycle_budget)) {
214  // Check if we're out of time on this particular SM
215  dw_should_terminate = true;
216  } else {
217  dw_should_terminate = false;
218  }
219  }
220  __syncthreads();
221  return dw_should_terminate;
222 }
223 
224 template <typename T = unsigned long long>
225 inline __device__ T get_empty_key() {
226  return EMPTY_KEY_64;
227 }
228 
229 template <>
230 inline __device__ unsigned int get_empty_key() {
231  return EMPTY_KEY_32;
232 }
233 
234 template <typename T>
235 inline __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
236  const uint32_t h,
237  const T* key,
238  const uint32_t key_count,
239  const uint32_t row_size_quad) {
240  const T empty_key = get_empty_key<T>();
241  uint32_t off = h * row_size_quad;
242  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
243  {
244  const T old = atomicCAS(row_ptr, empty_key, *key);
245  if (empty_key == old && key_count > 1) {
246  for (size_t i = 1; i <= key_count - 1; ++i) {
247  atomicExch(row_ptr + i, key[i]);
248  }
249  }
250  }
251  if (key_count > 1) {
252  while (atomicAdd(row_ptr + key_count - 1, 0) == empty_key) {
253  // spin until the winning thread has finished writing the entire key and the init
254  // value
255  }
256  }
257  bool match = true;
258  for (uint32_t i = 0; i < key_count; ++i) {
259  if (row_ptr[i] != key[i]) {
260  match = false;
261  break;
262  }
263  }
264 
265  if (match) {
266  auto row_ptr_i8 = reinterpret_cast<int8_t*>(row_ptr + key_count);
267  return reinterpret_cast<int64_t*>(align_to_int64(row_ptr_i8));
268  }
269  return NULL;
270 }
271 
272 extern "C" __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
273  const uint32_t h,
274  const int64_t* key,
275  const uint32_t key_count,
276  const uint32_t key_width,
277  const uint32_t row_size_quad,
278  const int64_t* init_vals) {
279  switch (key_width) {
280  case 4:
281  return get_matching_group_value(groups_buffer,
282  h,
283  reinterpret_cast<const unsigned int*>(key),
284  key_count,
285  row_size_quad);
286  case 8:
287  return get_matching_group_value(groups_buffer,
288  h,
289  reinterpret_cast<const unsigned long long*>(key),
290  key_count,
291  row_size_quad);
292  default:
293  return NULL;
294  }
295 }
296 
297 template <typename T>
298 __device__ int32_t get_matching_group_value_columnar_slot(int64_t* groups_buffer,
299  const uint32_t entry_count,
300  const uint32_t h,
301  const T* key,
302  const uint32_t key_count) {
303  uint32_t off = h;
304  {
305  const uint64_t old =
306  atomicCAS(reinterpret_cast<T*>(groups_buffer + off), get_empty_key<T>(), *key);
307  if (old == get_empty_key<T>()) {
308  for (size_t i = 0; i < key_count; ++i) {
309  groups_buffer[off] = key[i];
310  off += entry_count;
311  }
312  return h;
313  }
314  }
315  __syncthreads();
316  off = h;
317  for (size_t i = 0; i < key_count; ++i) {
318  if (groups_buffer[off] != key[i]) {
319  return -1;
320  }
321  off += entry_count;
322  }
323  return h;
324 }
325 
326 extern "C" __device__ int32_t
327 get_matching_group_value_columnar_slot(int64_t* groups_buffer,
328  const uint32_t entry_count,
329  const uint32_t h,
330  const int64_t* key,
331  const uint32_t key_count,
332  const uint32_t key_width) {
333  switch (key_width) {
334  case 4:
335  return get_matching_group_value_columnar_slot(
336  groups_buffer,
337  entry_count,
338  h,
339  reinterpret_cast<const unsigned int*>(key),
340  key_count);
341  case 8:
342  return get_matching_group_value_columnar_slot(
343  groups_buffer,
344  entry_count,
345  h,
346  reinterpret_cast<const unsigned long long*>(key),
347  key_count);
348  default:
349  return -1;
350  }
351 }
352 
353 extern "C" __device__ int64_t* get_matching_group_value_columnar(
354  int64_t* groups_buffer,
355  const uint32_t h,
356  const int64_t* key,
357  const uint32_t key_qw_count,
358  const size_t entry_count) {
359  uint32_t off = h;
360  {
361  const uint64_t old = atomicCAS(
362  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, *key);
363  if (EMPTY_KEY_64 == old) {
364  for (size_t i = 0; i < key_qw_count; ++i) {
365  groups_buffer[off] = key[i];
366  off += entry_count;
367  }
368  return &groups_buffer[off];
369  }
370  }
371  __syncthreads();
372  off = h;
373  for (size_t i = 0; i < key_qw_count; ++i) {
374  if (groups_buffer[off] != key[i]) {
375  return NULL;
376  }
377  off += entry_count;
378  }
379  return &groups_buffer[off];
380 }
381 
382 #include "GroupByRuntime.cpp"
383 #include "JoinHashTableQueryRuntime.cpp"
384 #include "MurmurHash.cpp"
385 #include "TopKRuntime.cpp"
386 
387 __device__ int64_t atomicMax64(int64_t* address, int64_t val) {
388  unsigned long long int* address_as_ull = (unsigned long long int*)address;
389  unsigned long long int old = *address_as_ull, assumed;
390 
391  do {
392  assumed = old;
393  old = atomicCAS(address_as_ull, assumed, max((long long)val, (long long)assumed));
394  } while (assumed != old);
395 
396  return old;
397 }
398 
399 __device__ int64_t atomicMin64(int64_t* address, int64_t val) {
400  unsigned long long int* address_as_ull = (unsigned long long int*)address;
401  unsigned long long int old = *address_as_ull, assumed;
402 
403  do {
404  assumed = old;
405  old = atomicCAS(address_as_ull, assumed, min((long long)val, (long long)assumed));
406  } while (assumed != old);
407 
408  return old;
409 }
410 
411 // As of 20160418, CUDA 8.0EA only defines `atomicAdd(double*, double)` for compute
412 // capability >= 6.0.
413 #if CUDA_VERSION < 8000 || (defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 600)
414 __device__ double atomicAdd(double* address, double val) {
415  unsigned long long int* address_as_ull = (unsigned long long int*)address;
416  unsigned long long int old = *address_as_ull, assumed;
417 
418  do {
419  assumed = old;
420  old = atomicCAS(address_as_ull,
421  assumed,
422  __double_as_longlong(val + __longlong_as_double(assumed)));
423 
424  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
425  } while (assumed != old);
426 
427  return __longlong_as_double(old);
428 }
429 #endif
430 
431 __device__ double atomicMax(double* address, double val) {
432  unsigned long long int* address_as_ull = (unsigned long long int*)address;
433  unsigned long long int old = *address_as_ull, assumed;
434 
435  do {
436  assumed = old;
437  old = atomicCAS(address_as_ull,
438  assumed,
439  __double_as_longlong(max(val, __longlong_as_double(assumed))));
440 
441  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
442  } while (assumed != old);
443 
444  return __longlong_as_double(old);
445 }
446 
447 __device__ float atomicMax(float* address, float val) {
448  int* address_as_int = (int*)address;
449  int old = *address_as_int, assumed;
450 
451  do {
452  assumed = old;
453  old = atomicCAS(
454  address_as_int, assumed, __float_as_int(max(val, __int_as_float(assumed))));
455 
456  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
457  } while (assumed != old);
458 
459  return __int_as_float(old);
460 }
461 
462 __device__ double atomicMin(double* address, double val) {
463  unsigned long long int* address_as_ull = (unsigned long long int*)address;
464  unsigned long long int old = *address_as_ull, assumed;
465 
466  do {
467  assumed = old;
468  old = atomicCAS(address_as_ull,
469  assumed,
470  __double_as_longlong(min(val, __longlong_as_double(assumed))));
471  } while (assumed != old);
472 
473  return __longlong_as_double(old);
474 }
475 
476 __device__ double atomicMin(float* address, float val) {
477  int* address_as_ull = (int*)address;
478  int old = *address_as_ull, assumed;
479 
480  do {
481  assumed = old;
482  old = atomicCAS(
483  address_as_ull, assumed, __float_as_int(min(val, __int_as_float(assumed))));
484  } while (assumed != old);
485 
486  return __int_as_float(old);
487 }
488 
489 extern "C" __device__ uint64_t agg_count_shared(uint64_t* agg, const int64_t val) {
490  return static_cast<uint64_t>(atomicAdd(reinterpret_cast<uint32_t*>(agg), 1UL));
491 }
492 
493 extern "C" __device__ uint32_t agg_count_int32_shared(uint32_t* agg, const int32_t val) {
494  return atomicAdd(agg, 1UL);
495 }
496 
497 extern "C" __device__ uint64_t agg_count_double_shared(uint64_t* agg, const double val) {
498  return agg_count_shared(agg, val);
499 }
500 
501 extern "C" __device__ uint32_t agg_count_float_shared(uint32_t* agg, const float val) {
502  return agg_count_int32_shared(agg, val);
503 }
504 
505 extern "C" __device__ int64_t agg_sum_shared(int64_t* agg, const int64_t val) {
506  return atomicAdd(reinterpret_cast<unsigned long long*>(agg), val);
507 }
508 
509 extern "C" __device__ int32_t agg_sum_int32_shared(int32_t* agg, const int32_t val) {
510  return atomicAdd(agg, val);
511 }
512 
513 extern "C" __device__ void agg_sum_float_shared(int32_t* agg, const float val) {
514  atomicAdd(reinterpret_cast<float*>(agg), val);
515 }
516 
517 extern "C" __device__ void agg_sum_double_shared(int64_t* agg, const double val) {
518  atomicAdd(reinterpret_cast<double*>(agg), val);
519 }
520 
521 extern "C" __device__ void agg_max_shared(int64_t* agg, const int64_t val) {
522  atomicMax64(agg, val);
523 }
524 
525 extern "C" __device__ void agg_max_int32_shared(int32_t* agg, const int32_t val) {
526  atomicMax(agg, val);
527 }
528 
529 extern "C" __device__ void agg_max_double_shared(int64_t* agg, const double val) {
530  atomicMax(reinterpret_cast<double*>(agg), val);
531 }
532 
533 extern "C" __device__ void agg_max_float_shared(int32_t* agg, const float val) {
534  atomicMax(reinterpret_cast<float*>(agg), val);
535 }
536 
537 extern "C" __device__ void agg_min_shared(int64_t* agg, const int64_t val) {
538  atomicMin64(agg, val);
539 }
540 
541 extern "C" __device__ void agg_min_int32_shared(int32_t* agg, const int32_t val) {
542  atomicMin(agg, val);
543 }
544 
545 // TODO(Saman): use 16-bit atomicCAS for Turing
546 extern "C" __device__ void atomicMax16(int16_t* agg, const int16_t val) {
547  // properly align the input pointer:
548  unsigned int* base_address_u32 =
549  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
550 
551  unsigned int old_value = *base_address_u32;
552  unsigned int swap_value, compare_value;
553  do {
554  compare_value = old_value;
555  swap_value =
556  (reinterpret_cast<size_t>(agg) & 0x2)
557  ? static_cast<unsigned int>(max(static_cast<int16_t>(old_value >> 16), val))
558  << 16 |
559  (old_value & 0xFFFF)
560  : (old_value & 0xFFFF0000) |
561  static_cast<unsigned int>(
562  max(static_cast<int16_t>(old_value & 0xFFFF), val));
563  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
564  } while (old_value != compare_value);
565 }
566 
567 extern "C" __device__ void atomicMax8(int8_t* agg, const int8_t val) {
568  // properly align the input pointer:
569  unsigned int* base_address_u32 =
570  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
571 
572  // __byte_perm(unsigned int A, unsigned int B, unsigned int s):
573  // if s == 0x3214 returns {A[31..24], A[23..16], A[15..8], B[7..0]}
574  // if s == 0x3240 returns {A[31..24], A[23..16], B[7...0], A[7..0]}
575  // if s == 0x3410 returns {A[31..24], B[7....0], A[15..8], A[7..0]}
576  // if s == 0x4210 returns {B[7....0], A[23..16], A[15..8], A[7..0]}
577  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
578  unsigned int old_value = *base_address_u32;
579  unsigned int swap_value, compare_value;
580  do {
581  compare_value = old_value;
582  auto max_value = static_cast<unsigned int>(
583  // compare val with its corresponding bits in the compare_value
584  max(val,
585  static_cast<int8_t>(__byte_perm(
586  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
587  swap_value = __byte_perm(
588  compare_value, max_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
589  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
590  } while (compare_value != old_value);
591 }
592 
593 extern "C" __device__ void atomicMin16(int16_t* agg, const int16_t val) {
594  // properly align the input pointer:
595  unsigned int* base_address_u32 =
596  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
597 
598  unsigned int old_value = *base_address_u32;
599  unsigned int swap_value, compare_value;
600  do {
601  compare_value = old_value;
602  swap_value =
603  (reinterpret_cast<size_t>(agg) & 0x2)
604  ? static_cast<unsigned int>(min(static_cast<int16_t>(old_value >> 16), val))
605  << 16 |
606  (old_value & 0xFFFF)
607  : (old_value & 0xFFFF0000) |
608  static_cast<unsigned int>(
609  min(static_cast<int16_t>(old_value & 0xFFFF), val));
610  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
611  } while (old_value != compare_value);
612 }
613 
614 extern "C" __device__ void atomicMin16SkipVal(int16_t* agg,
615  const int16_t val,
616  const int16_t skip_val) {
617  // properly align the input pointer:
618  unsigned int* base_address_u32 =
619  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
620 
621  unsigned int old_value = *base_address_u32;
622  unsigned int swap_value, compare_value;
623  do {
624  compare_value = old_value;
625  int16_t selected_old_val = (reinterpret_cast<size_t>(agg) & 0x2)
626  ? static_cast<int16_t>(old_value >> 16)
627  : static_cast<int16_t>(old_value & 0xFFFF);
628 
629  swap_value =
630  (reinterpret_cast<size_t>(agg) & 0x2)
631  ? static_cast<unsigned int>(
632  selected_old_val == skip_val ? val : min(selected_old_val, val))
633  << 16 |
634  (old_value & 0xFFFF)
635  : (old_value & 0xFFFF0000) |
636  static_cast<unsigned int>(
637  selected_old_val == skip_val ? val : min(selected_old_val, val));
638  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
639  } while (old_value != compare_value);
640 }
641 
642 extern "C" __device__ void atomicMin8(int8_t* agg, const int8_t val) {
643  // properly align the input pointer:
644  unsigned int* base_address_u32 =
645  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
646 
647  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
648  unsigned int old_value = *base_address_u32;
649  unsigned int swap_value, compare_value;
650  do {
651  compare_value = old_value;
652  auto min_value = static_cast<unsigned int>(
653  min(val,
654  static_cast<int8_t>(__byte_perm(
655  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
656  swap_value = __byte_perm(
657  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
658  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
659  } while (compare_value != old_value);
660 }
661 
662 extern "C" __device__ void atomicMin8SkipVal(int8_t* agg,
663  const int8_t val,
664  const int8_t skip_val) {
665  // properly align the input pointer:
666  unsigned int* base_address_u32 =
667  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
668 
669  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
670  unsigned int old_value = *base_address_u32;
671  unsigned int swap_value, compare_value;
672  do {
673  compare_value = old_value;
674  int8_t selected_old_val = static_cast<int8_t>(
675  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440));
676  auto min_value = static_cast<unsigned int>(
677  selected_old_val == skip_val ? val : min(val, selected_old_val));
678  swap_value = __byte_perm(
679  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
680  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
681  } while (compare_value != old_value);
682 }
683 
684 extern "C" __device__ void agg_max_int16_shared(int16_t* agg, const int16_t val) {
685  return atomicMax16(agg, val);
686 }
687 
688 extern "C" __device__ void agg_max_int8_shared(int8_t* agg, const int8_t val) {
689  return atomicMax8(agg, val);
690 }
691 
692 extern "C" __device__ void agg_min_int16_shared(int16_t* agg, const int16_t val) {
693  return atomicMin16(agg, val);
694 }
695 
696 extern "C" __device__ void agg_min_int8_shared(int8_t* agg, const int8_t val) {
697  return atomicMin8(agg, val);
698 }
699 
700 extern "C" __device__ void agg_min_double_shared(int64_t* agg, const double val) {
701  atomicMin(reinterpret_cast<double*>(agg), val);
702 }
703 
704 extern "C" __device__ void agg_min_float_shared(int32_t* agg, const float val) {
705  atomicMin(reinterpret_cast<float*>(agg), val);
706 }
707 
708 extern "C" __device__ void agg_id_shared(int64_t* agg, const int64_t val) {
709  *agg = val;
710 }
711 
712 #define DEF_AGG_ID_INT_SHARED(n) \
713  extern "C" __device__ void agg_id_int##n##_shared(int##n##_t* agg, \
714  const int##n##_t val) { \
715  *agg = val; \
716  }
717 
718 DEF_AGG_ID_INT_SHARED(32)
719 DEF_AGG_ID_INT_SHARED(16)
720 DEF_AGG_ID_INT_SHARED(8)
721 #undef DEF_AGG_ID_INT_SHARED
722 
723 extern "C" __device__ void agg_id_double_shared(int64_t* agg, const double val) {
724  *agg = *(reinterpret_cast<const int64_t*>(&val));
725 }
726 
727 extern "C" __device__ void agg_id_double_shared_slow(int64_t* agg, const double* val) {
728  *agg = *(reinterpret_cast<const int64_t*>(val));
729 }
730 
731 extern "C" __device__ void agg_id_float_shared(int32_t* agg, const float val) {
732  *agg = __float_as_int(val);
733 }
734 
735 #define DEF_SKIP_AGG(base_agg_func) \
736  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
737  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
738  if (val != skip_val) { \
739  return base_agg_func##_shared(agg, val); \
740  } \
741  return 0; \
742  }
743 
744 #define DATA_T int64_t
745 #define ADDR_T uint64_t
746 DEF_SKIP_AGG(agg_count)
747 #undef DATA_T
748 #undef ADDR_T
749 
750 #define DATA_T int32_t
751 #define ADDR_T uint32_t
752 DEF_SKIP_AGG(agg_count_int32)
753 #undef DATA_T
754 #undef ADDR_T
755 
756 // Initial value for nullable column is INT32_MIN
757 extern "C" __device__ void agg_max_int32_skip_val_shared(int32_t* agg,
758  const int32_t val,
759  const int32_t skip_val) {
760  if (val != skip_val) {
761  agg_max_int32_shared(agg, val);
762  }
763 }
764 
765 extern "C" __device__ void agg_max_int16_skip_val_shared(int16_t* agg,
766  const int16_t val,
767  const int16_t skip_val) {
768  if (val != skip_val) {
769  agg_max_int16_shared(agg, val);
770  }
771 }
772 
773 extern "C" __device__ void agg_min_int16_skip_val_shared(int16_t* agg,
774  const int16_t val,
775  const int16_t skip_val) {
776  if (val != skip_val) {
777  atomicMin16SkipVal(agg, val, skip_val);
778  }
779 }
780 
781 extern "C" __device__ void agg_max_int8_skip_val_shared(int8_t* agg,
782  const int8_t val,
783  const int8_t skip_val) {
784  if (val != skip_val) {
785  agg_max_int8_shared(agg, val);
786  }
787 }
788 
789 extern "C" __device__ void agg_min_int8_skip_val_shared(int8_t* agg,
790  const int8_t val,
791  const int8_t skip_val) {
792  if (val != skip_val) {
793  atomicMin8SkipVal(agg, val, skip_val);
794  }
795 }
796 
797 __device__ int32_t atomicMin32SkipVal(int32_t* address,
798  int32_t val,
799  const int32_t skip_val) {
800  int32_t old = atomicExch(address, INT_MAX);
801  return atomicMin(address, old == skip_val ? val : min(old, val));
802 }
803 
804 extern "C" __device__ void agg_min_int32_skip_val_shared(int32_t* agg,
805  const int32_t val,
806  const int32_t skip_val) {
807  if (val != skip_val) {
808  atomicMin32SkipVal(agg, val, skip_val);
809  }
810 }
811 
812 __device__ int32_t atomicSum32SkipVal(int32_t* address,
813  const int32_t val,
814  const int32_t skip_val) {
815  unsigned int* address_as_int = (unsigned int*)address;
816  int32_t old = atomicExch(address_as_int, 0);
817  int32_t old2 = atomicAdd(address_as_int, old == skip_val ? val : (val + old));
818  return old == skip_val ? old2 : (old2 + old);
819 }
820 
821 extern "C" __device__ int32_t agg_sum_int32_skip_val_shared(int32_t* agg,
822  const int32_t val,
823  const int32_t skip_val) {
824  if (val != skip_val) {
825  const int32_t old = atomicSum32SkipVal(agg, val, skip_val);
826  return old;
827  }
828  return 0;
829 }
830 
831 __device__ int64_t atomicSum64SkipVal(int64_t* address,
832  const int64_t val,
833  const int64_t skip_val) {
834  unsigned long long int* address_as_ull = (unsigned long long int*)address;
835  int64_t old = atomicExch(address_as_ull, 0);
836  int64_t old2 = atomicAdd(address_as_ull, old == skip_val ? val : (val + old));
837  return old == skip_val ? old2 : (old2 + old);
838 }
839 
840 extern "C" __device__ int64_t agg_sum_skip_val_shared(int64_t* agg,
841  const int64_t val,
842  const int64_t skip_val) {
843  if (val != skip_val) {
844  return atomicSum64SkipVal(agg, val, skip_val);
845  }
846  return 0;
847 }
848 
849 __device__ int64_t atomicMin64SkipVal(int64_t* address,
850  int64_t val,
851  const int64_t skip_val) {
852  unsigned long long int* address_as_ull =
853  reinterpret_cast<unsigned long long int*>(address);
854  unsigned long long int old = *address_as_ull, assumed;
855 
856  do {
857  assumed = old;
858  old = atomicCAS(address_as_ull,
859  assumed,
860  assumed == skip_val ? val : min((long long)val, (long long)assumed));
861  } while (assumed != old);
862 
863  return old;
864 }
865 
866 extern "C" __device__ void agg_min_skip_val_shared(int64_t* agg,
867  const int64_t val,
868  const int64_t skip_val) {
869  if (val != skip_val) {
870  atomicMin64SkipVal(agg, val, skip_val);
871  }
872 }
873 
874 __device__ int64_t atomicMax64SkipVal(int64_t* address,
875  int64_t val,
876  const int64_t skip_val) {
877  unsigned long long int* address_as_ull =
878  reinterpret_cast<unsigned long long int*>(address);
879  unsigned long long int old = *address_as_ull, assumed;
880 
881  do {
882  assumed = old;
883  old = atomicCAS(address_as_ull,
884  assumed,
885  assumed == skip_val ? val : max((long long)val, (long long)assumed));
886  } while (assumed != old);
887 
888  return old;
889 }
890 
891 extern "C" __device__ void agg_max_skip_val_shared(int64_t* agg,
892  const int64_t val,
893  const int64_t skip_val) {
894  if (val != skip_val) {
895  atomicMax64SkipVal(agg, val, skip_val);
896  }
897 }
898 
899 #undef DEF_SKIP_AGG
900 #define DEF_SKIP_AGG(base_agg_func) \
901  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
902  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
903  if (val != skip_val) { \
904  return base_agg_func##_shared(agg, val); \
905  } \
906  return *agg; \
907  }
908 
909 #define DATA_T double
910 #define ADDR_T uint64_t
911 DEF_SKIP_AGG(agg_count_double)
912 #undef ADDR_T
913 #undef DATA_T
914 
915 #define DATA_T float
916 #define ADDR_T uint32_t
917 DEF_SKIP_AGG(agg_count_float)
918 #undef ADDR_T
919 #undef DATA_T
920 
921 // Initial value for nullable column is FLOAT_MIN
922 extern "C" __device__ void agg_max_float_skip_val_shared(int32_t* agg,
923  const float val,
924  const float skip_val) {
925  if (__float_as_int(val) != __float_as_int(skip_val)) {
926  float old = atomicExch(reinterpret_cast<float*>(agg), -FLT_MAX);
927  atomicMax(reinterpret_cast<float*>(agg),
928  __float_as_int(old) == __float_as_int(skip_val) ? val : fmaxf(old, val));
929  }
930 }
931 
932 __device__ float atomicMinFltSkipVal(int32_t* address, float val, const float skip_val) {
933  float old = atomicExch(reinterpret_cast<float*>(address), FLT_MAX);
934  return atomicMin(
935  reinterpret_cast<float*>(address),
936  __float_as_int(old) == __float_as_int(skip_val) ? val : fminf(old, val));
937 }
938 
939 extern "C" __device__ void agg_min_float_skip_val_shared(int32_t* agg,
940  const float val,
941  const float skip_val) {
942  if (__float_as_int(val) != __float_as_int(skip_val)) {
943  atomicMinFltSkipVal(agg, val, skip_val);
944  }
945 }
946 
947 __device__ void atomicSumFltSkipVal(float* address,
948  const float val,
949  const float skip_val) {
950  float old = atomicExch(address, 0.f);
951  atomicAdd(address, __float_as_int(old) == __float_as_int(skip_val) ? val : (val + old));
952 }
953 
954 extern "C" __device__ void agg_sum_float_skip_val_shared(int32_t* agg,
955  const float val,
956  const float skip_val) {
957  if (__float_as_int(val) != __float_as_int(skip_val)) {
958  atomicSumFltSkipVal(reinterpret_cast<float*>(agg), val, skip_val);
959  }
960 }
961 
962 __device__ void atomicSumDblSkipVal(double* address,
963  const double val,
964  const double skip_val) {
965  unsigned long long int* address_as_ull = (unsigned long long int*)address;
966  double old = __longlong_as_double(atomicExch(address_as_ull, __double_as_longlong(0.)));
967  atomicAdd(
968  address,
969  __double_as_longlong(old) == __double_as_longlong(skip_val) ? val : (val + old));
970 }
971 
972 extern "C" __device__ void agg_sum_double_skip_val_shared(int64_t* agg,
973  const double val,
974  const double skip_val) {
975  if (__double_as_longlong(val) != __double_as_longlong(skip_val)) {
976  atomicSumDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
977  }
978 }
979 
980 __device__ double atomicMinDblSkipVal(double* address,
981  double val,
982  const double skip_val) {
983  unsigned long long int* address_as_ull =
984  reinterpret_cast<unsigned long long int*>(address);
985  unsigned long long int old = *address_as_ull;
986  unsigned long long int skip_val_as_ull =
987  *reinterpret_cast<const unsigned long long*>(&skip_val);
988  unsigned long long int assumed;
989 
990  do {
991  assumed = old;
992  old = atomicCAS(address_as_ull,
993  assumed,
994  assumed == skip_val_as_ull
995  ? *reinterpret_cast<unsigned long long*>(&val)
996  : __double_as_longlong(min(val, __longlong_as_double(assumed))));
997  } while (assumed != old);
998 
999  return __longlong_as_double(old);
1000 }
1001 
1002 extern "C" __device__ void agg_min_double_skip_val_shared(int64_t* agg,
1003  const double val,
1004  const double skip_val) {
1005  if (val != skip_val) {
1006  atomicMinDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1007  }
1008 }
1009 
1010 __device__ double atomicMaxDblSkipVal(double* address,
1011  double val,
1012  const double skip_val) {
1013  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1014  unsigned long long int old = *address_as_ull;
1015  unsigned long long int skip_val_as_ull = *((unsigned long long int*)&skip_val);
1016  unsigned long long int assumed;
1017 
1018  do {
1019  assumed = old;
1020  old = atomicCAS(address_as_ull,
1021  assumed,
1022  assumed == skip_val_as_ull
1023  ? *((unsigned long long int*)&val)
1024  : __double_as_longlong(max(val, __longlong_as_double(assumed))));
1025  } while (assumed != old);
1026 
1027  return __longlong_as_double(old);
1028 }
1029 
1030 extern "C" __device__ void agg_max_double_skip_val_shared(int64_t* agg,
1031  const double val,
1032  const double skip_val) {
1033  if (val != skip_val) {
1034  atomicMaxDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1035  }
1036 }
1037 
1038 #undef DEF_SKIP_AGG
1039 
1040 extern "C" __device__ bool slotEmptyKeyCAS(int64_t* slot,
1041  int64_t new_val,
1042  int64_t init_val) {
1043  auto slot_address = reinterpret_cast<unsigned long long int*>(slot);
1044  const auto empty_key =
1045  static_cast<unsigned long long int*>(static_cast<void*>(&init_val));
1046  const auto new_val_cast =
1047  static_cast<unsigned long long int*>(static_cast<void*>(&new_val));
1048 
1049  const auto old_val = atomicCAS(slot_address, *empty_key, *new_val_cast);
1050  if (old_val == *empty_key) {
1051  return true;
1052  } else {
1053  return false;
1054  }
1055 }
1056 
1057 extern "C" __device__ bool slotEmptyKeyCAS_int32(int32_t* slot,
1058  int32_t new_val,
1059  int32_t init_val) {
1060  unsigned int* slot_address = reinterpret_cast<unsigned int*>(slot);
1061  unsigned int compare_value = static_cast<unsigned int>(init_val);
1062  unsigned int swap_value = static_cast<unsigned int>(new_val);
1063 
1064  const unsigned int old_value = atomicCAS(slot_address, compare_value, swap_value);
1065  return old_value == compare_value;
1066 }
1067 #include <stdio.h>
1068 extern "C" __device__ bool slotEmptyKeyCAS_int16(int16_t* slot,
1069  int16_t new_val,
1070  int16_t init_val) {
1071  unsigned int* base_slot_address =
1072  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1073  unsigned int old_value = *base_slot_address;
1074  unsigned int swap_value, compare_value;
1075  do {
1076  compare_value = old_value;
1077  // exit criteria: if init_val does not exist in the slot (some other thread has
1078  // succeeded)
1079  if (static_cast<unsigned int>(init_val) !=
1080  __byte_perm(
1081  compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x2 ? 0x3244 : 0x4410))) {
1082  return false;
1083  }
1084  swap_value = __byte_perm(compare_value,
1085  static_cast<unsigned int>(new_val),
1086  (reinterpret_cast<size_t>(slot) & 0x2) ? 0x5410 : 0x3254);
1087  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1088  } while (compare_value != old_value);
1089  return true;
1090 }
1091 
1092 extern "C" __device__ bool slotEmptyKeyCAS_int8(int8_t* slot,
1093  int8_t new_val,
1094  int8_t init_val) {
1095  // properly align the slot address:
1096  unsigned int* base_slot_address =
1097  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1098  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
1099  unsigned int old_value = *base_slot_address;
1100  unsigned int swap_value, compare_value;
1101  do {
1102  compare_value = old_value;
1103  // exit criteria: if init_val does not exist in the slot (some other thread has
1104  // succeeded)
1105  if (static_cast<unsigned int>(init_val) !=
1106  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x3) | 0x4440)) {
1107  return false;
1108  }
1109  swap_value = __byte_perm(compare_value,
1110  static_cast<unsigned int>(new_val),
1111  byte_permutations[reinterpret_cast<size_t>(slot) & 0x3]);
1112  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1113  } while (compare_value != old_value);
1114  return true;
1115 }
1116 
1117 #include "../Utils/ChunkIter.cpp"
1118 #include "DateTruncate.cpp"
1119 #include "ExtractFromTime.cpp"
1120 #define EXECUTE_INCLUDE
1121 #include "ArrayOps.cpp"
1122 #include "DateAdd.cpp"
1123 #include "StringFunctions.cpp"
1124 #undef EXECUTE_INCLUDE
1125 #include "../Utils/Regexp.cpp"
1126 #include "../Utils/StringLike.cpp"
1127 
1128 extern "C" __device__ uint64_t string_decode(int8_t* chunk_iter_, int64_t pos) {
1129  // TODO(alex): de-dup, the x64 version is basically identical
1130  ChunkIter* chunk_iter = reinterpret_cast<ChunkIter*>(chunk_iter_);
1131  VarlenDatum vd;
1132  bool is_end;
1133  ChunkIter_get_nth(chunk_iter, pos, false, &vd, &is_end);
1134  return vd.is_null ? 0
1135  : (reinterpret_cast<uint64_t>(vd.pointer) & 0xffffffffffff) |
1136  (static_cast<uint64_t>(vd.length) << 48);
1137 }
1138 
1139 extern "C" __device__ void linear_probabilistic_count(uint8_t* bitmap,
1140  const uint32_t bitmap_bytes,
1141  const uint8_t* key_bytes,
1142  const uint32_t key_len) {
1143  const uint32_t bit_pos = MurmurHash1(key_bytes, key_len, 0) % (bitmap_bytes * 8);
1144  const uint32_t word_idx = bit_pos / 32;
1145  const uint32_t bit_idx = bit_pos % 32;
1146  atomicOr(((uint32_t*)bitmap) + word_idx, 1 << bit_idx);
1147 }
1148 
1149 extern "C" __device__ void agg_count_distinct_bitmap_gpu(int64_t* agg,
1150  const int64_t val,
1151  const int64_t min_val,
1152  const int64_t base_dev_addr,
1153  const int64_t base_host_addr,
1154  const uint64_t sub_bitmap_count,
1155  const uint64_t bitmap_bytes) {
1156  const uint64_t bitmap_idx = val - min_val;
1157  const uint32_t byte_idx = bitmap_idx >> 3;
1158  const uint32_t word_idx = byte_idx >> 2;
1159  const uint32_t byte_word_idx = byte_idx & 3;
1160  const int64_t host_addr = *agg;
1161  uint32_t* bitmap = (uint32_t*)(base_dev_addr + host_addr - base_host_addr +
1162  (threadIdx.x & (sub_bitmap_count - 1)) * bitmap_bytes);
1163  switch (byte_word_idx) {
1164  case 0:
1165  atomicOr(&bitmap[word_idx], 1 << (bitmap_idx & 7));
1166  break;
1167  case 1:
1168  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 8));
1169  break;
1170  case 2:
1171  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 16));
1172  break;
1173  case 3:
1174  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 24));
1175  break;
1176  default:
1177  break;
1178  }
1179 }
1180 
1181 extern "C" __device__ void agg_count_distinct_bitmap_skip_val_gpu(
1182  int64_t* agg,
1183  const int64_t val,
1184  const int64_t min_val,
1185  const int64_t skip_val,
1186  const int64_t base_dev_addr,
1187  const int64_t base_host_addr,
1188  const uint64_t sub_bitmap_count,
1189  const uint64_t bitmap_bytes) {
1190  if (val != skip_val) {
1191  agg_count_distinct_bitmap_gpu(
1192  agg, val, min_val, base_dev_addr, base_host_addr, sub_bitmap_count, bitmap_bytes);
1193  }
1194 }
1195 
1196 extern "C" __device__ void agg_approximate_count_distinct_gpu(
1197  int64_t* agg,
1198  const int64_t key,
1199  const uint32_t b,
1200  const int64_t base_dev_addr,
1201  const int64_t base_host_addr) {
1202  const uint64_t hash = MurmurHash64A(&key, sizeof(key), 0);
1203  const uint32_t index = hash >> (64 - b);
1204  const int32_t rank = get_rank(hash << b, 64 - b);
1205  const int64_t host_addr = *agg;
1206  int32_t* M = (int32_t*)(base_dev_addr + host_addr - base_host_addr);
1207  atomicMax(&M[index], rank);
1208 }
1209 
1210 extern "C" __device__ void force_sync() {
1211  __threadfence_block();
1212 }
1213 
1214 extern "C" __device__ void sync_warp() {
1215 #if (CUDA_VERSION >= 9000)
1216  __syncwarp();
1217 #endif
1218 }
1219 
1220 /**
1221  * Protected warp synchornization to make sure all (or none) threads within a warp go
1222  * through a synchronization barrier. thread_pos: the current thread position to be used
1223  * for a memory access row_count: maximum number of rows to be processed The function
1224  * performs warp sync iff all 32 threads within that warp will process valid data NOTE: it
1225  * currently assumes that warp size is 32.
1226  */
1227 extern "C" __device__ void sync_warp_protected(int64_t thread_pos, int64_t row_count) {
1228 #if (CUDA_VERSION >= 9000)
1229  // only syncing if NOT within the same warp as those threads experiencing the critical
1230  // edge
1231  if ((((row_count - 1) | 0x1F) - thread_pos) >= 32) {
1232  __syncwarp();
1233  }
1234 #endif
1235 }