OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cuda_mapd_rt.cu
Go to the documentation of this file.
1 #include <cuda.h>
2 #include <float.h>
3 #include <stdint.h>
4 #include <stdio.h>
5 #include <limits>
6 #include "BufferCompaction.h"
7 #include "ExtensionFunctions.hpp"
8 #include "GpuRtConstants.h"
9 #include "HyperLogLogRank.h"
11 
12 #if CUDA_VERSION < 10000
13 static_assert(false, "CUDA v10.0 or later is required.");
14 #endif
15 
16 #if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 350)
17 static_assert(false, "CUDA Compute Capability of 3.5 or greater is required.");
18 #endif
19 
20 extern "C" __device__ int64_t get_thread_index() {
21  return threadIdx.x;
22 }
23 
24 extern "C" __device__ int64_t get_block_index() {
25  return blockIdx.x;
26 }
27 
28 extern "C" __device__ int32_t pos_start_impl(const int32_t* row_index_resume) {
29  return blockIdx.x * blockDim.x + threadIdx.x;
30 }
31 
32 extern "C" __device__ int32_t group_buff_idx_impl() {
33  return pos_start_impl(NULL);
34 }
35 
36 extern "C" __device__ int32_t pos_step_impl() {
37  return blockDim.x * gridDim.x;
38 }
39 
40 extern "C" __device__ int8_t thread_warp_idx(const int8_t warp_sz) {
41  return threadIdx.x % warp_sz;
42 }
43 
44 extern "C" __device__ const int64_t* init_shared_mem_nop(
45  const int64_t* groups_buffer,
46  const int32_t groups_buffer_size) {
47  return groups_buffer;
48 }
49 
50 extern "C" __device__ void write_back_nop(int64_t* dest, int64_t* src, const int32_t sz) {
51 }
52 
53 /*
54  * Just declares and returns a dynamic shared memory pointer. Total size should be
55  * properly set during kernel launch
56  */
57 extern "C" __device__ int64_t* declare_dynamic_shared_memory() {
58  extern __shared__ int64_t shared_mem_buffer[];
59  return shared_mem_buffer;
60 }
61 
67 extern "C" __device__ const int64_t* init_shared_mem(const int64_t* global_groups_buffer,
68  const int32_t groups_buffer_size) {
69  // dynamic shared memory declaration
70  extern __shared__ int64_t shared_groups_buffer[];
71 
72  // it is assumed that buffer size is aligned with 64-bit units
73  // so it is safe to assign 64-bit to each thread
74  const int32_t buffer_units = groups_buffer_size >> 3;
75 
76  for (int32_t pos = threadIdx.x; pos < buffer_units; pos += blockDim.x) {
77  shared_groups_buffer[pos] = global_groups_buffer[pos];
78  }
79  __syncthreads();
80  return shared_groups_buffer;
81 }
82 
83 #define init_group_by_buffer_gpu_impl init_group_by_buffer_gpu
84 
85 #include "GpuInitGroups.cu"
86 
87 #undef init_group_by_buffer_gpu_impl
88 
89 // Dynamic watchdog: monitoring up to 64 SMs. E.g. GP100 config may have 60:
90 // 6 Graphics Processing Clusters (GPCs) * 10 Streaming Multiprocessors
91 // TODO(Saman): move these into a kernel parameter, allocated and initialized through CUDA
92 __device__ int64_t dw_sm_cycle_start[128]; // Set from host before launching the kernel
93 // TODO(Saman): make this cycle budget something constant in codegen level
94 __device__ int64_t dw_cycle_budget = 0; // Set from host before launching the kernel
95 __device__ int32_t dw_abort = 0; // TBD: set from host (async)
96 __device__ int32_t runtime_interrupt_flag = 0;
97 
98 __inline__ __device__ uint32_t get_smid(void) {
99  uint32_t ret;
100  asm("mov.u32 %0, %%smid;" : "=r"(ret));
101  return ret;
102 }
103 
104 /*
105  * The main objective of this function is to return true, if any of the following two
106  * scenarios happen:
107  * 1. receives a host request for aborting the kernel execution
108  * 2. kernel execution takes longer clock cycles than it was initially allowed
109  * The assumption is that all (or none) threads within a block return true for the
110  * watchdog, and the first thread within each block compares the recorded clock cycles for
111  * its occupying SM with the allowed budget. It also assumes that all threads entering
112  * this function are active (no critical edge exposure)
113  * NOTE: dw_cycle_budget, dw_abort, and dw_sm_cycle_start[] are all variables in global
114  * memory scope.
115  */
116 extern "C" __device__ bool dynamic_watchdog() {
117  // check for dynamic watchdog, if triggered all threads return true
118  if (dw_cycle_budget == 0LL) {
119  return false; // Uninitialized watchdog can't check time
120  }
121  if (dw_abort == 1) {
122  return true; // Received host request to abort
123  }
124  uint32_t smid = get_smid();
125  if (smid >= 128) {
126  return false;
127  }
128  __shared__ volatile int64_t dw_block_cycle_start; // Thread block shared cycle start
129  __shared__ volatile bool
130  dw_should_terminate; // all threads within a block should return together if
131  // watchdog criteria is met
132 
133  // thread 0 either initializes or read the initial clock cycle, the result is stored
134  // into shared memory. Since all threads wihtin a block shares the same SM, there's no
135  // point in using more threads here.
136  if (threadIdx.x == 0) {
137  dw_block_cycle_start = 0LL;
138  int64_t cycle_count = static_cast<int64_t>(clock64());
139  // Make sure the block hasn't switched SMs
140  if (smid == get_smid()) {
141  dw_block_cycle_start = static_cast<int64_t>(
142  atomicCAS(reinterpret_cast<unsigned long long*>(&dw_sm_cycle_start[smid]),
143  0ULL,
144  static_cast<unsigned long long>(cycle_count)));
145  }
146 
147  int64_t cycles = cycle_count - dw_block_cycle_start;
148  if ((smid == get_smid()) && (dw_block_cycle_start > 0LL) &&
149  (cycles > dw_cycle_budget)) {
150  // Check if we're out of time on this particular SM
151  dw_should_terminate = true;
152  } else {
153  dw_should_terminate = false;
154  }
155  }
156  __syncthreads();
157  return dw_should_terminate;
158 }
159 
160 extern "C" __device__ bool check_interrupt() {
161  return (runtime_interrupt_flag == 1) ? true : false;
162 }
163 
164 template <typename T = unsigned long long>
165 inline __device__ T get_empty_key() {
166  return EMPTY_KEY_64;
167 }
168 
169 template <>
170 inline __device__ unsigned int get_empty_key() {
171  return EMPTY_KEY_32;
172 }
173 
174 template <typename T>
175 inline __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
176  const uint32_t h,
177  const T* key,
178  const uint32_t key_count,
179  const uint32_t row_size_quad) {
180  const T empty_key = get_empty_key<T>();
181  uint32_t off = h * row_size_quad;
182  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
183  {
184  const T old = atomicCAS(row_ptr, empty_key, *key);
185  if (empty_key == old && key_count > 1) {
186  for (size_t i = 1; i <= key_count - 1; ++i) {
187  atomicExch(row_ptr + i, key[i]);
188  }
189  }
190  }
191  if (key_count > 1) {
192  while (atomicAdd(row_ptr + key_count - 1, 0) == empty_key) {
193  // spin until the winning thread has finished writing the entire key and the init
194  // value
195  }
196  }
197  bool match = true;
198  for (uint32_t i = 0; i < key_count; ++i) {
199  if (row_ptr[i] != key[i]) {
200  match = false;
201  break;
202  }
203  }
204 
205  if (match) {
206  auto row_ptr_i8 = reinterpret_cast<int8_t*>(row_ptr + key_count);
207  return reinterpret_cast<int64_t*>(align_to_int64(row_ptr_i8));
208  }
209  return NULL;
210 }
211 
212 extern "C" __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
213  const uint32_t h,
214  const int64_t* key,
215  const uint32_t key_count,
216  const uint32_t key_width,
217  const uint32_t row_size_quad) {
218  switch (key_width) {
219  case 4:
220  return get_matching_group_value(groups_buffer,
221  h,
222  reinterpret_cast<const unsigned int*>(key),
223  key_count,
224  row_size_quad);
225  case 8:
226  return get_matching_group_value(groups_buffer,
227  h,
228  reinterpret_cast<const unsigned long long*>(key),
229  key_count,
230  row_size_quad);
231  default:
232  return NULL;
233  }
234 }
235 
236 template <typename T>
237 __device__ int32_t get_matching_group_value_columnar_slot(int64_t* groups_buffer,
238  const uint32_t entry_count,
239  const uint32_t h,
240  const T* key,
241  const uint32_t key_count) {
242  const T empty_key = get_empty_key<T>();
243  const uint64_t old =
244  atomicCAS(reinterpret_cast<T*>(groups_buffer + h), empty_key, *key);
245  // the winner thread proceeds with writing the rest fo the keys
246  if (old == empty_key) {
247  uint32_t offset = h + entry_count;
248  for (size_t i = 1; i < key_count; ++i) {
249  *reinterpret_cast<T*>(groups_buffer + offset) = key[i];
250  offset += entry_count;
251  }
252  }
253 
254  __threadfence();
255  // for all threads except the winning thread, memory content of the keys
256  // related to the hash offset are checked again. In case of a complete match
257  // the hash offset is returned, otherwise -1 is returned
258  if (old != empty_key) {
259  uint32_t offset = h;
260  for (uint32_t i = 0; i < key_count; ++i) {
261  if (*reinterpret_cast<T*>(groups_buffer + offset) != key[i]) {
262  return -1;
263  }
264  offset += entry_count;
265  }
266  }
267  return h;
268 }
269 
270 extern "C" __device__ int32_t
272  const uint32_t entry_count,
273  const uint32_t h,
274  const int64_t* key,
275  const uint32_t key_count,
276  const uint32_t key_width) {
277  switch (key_width) {
278  case 4:
280  groups_buffer,
281  entry_count,
282  h,
283  reinterpret_cast<const unsigned int*>(key),
284  key_count);
285  case 8:
287  groups_buffer,
288  entry_count,
289  h,
290  reinterpret_cast<const unsigned long long*>(key),
291  key_count);
292  default:
293  return -1;
294  }
295 }
296 
297 extern "C" __device__ int64_t* get_matching_group_value_columnar(
298  int64_t* groups_buffer,
299  const uint32_t h,
300  const int64_t* key,
301  const uint32_t key_qw_count,
302  const size_t entry_count) {
303  uint32_t off = h;
304  {
305  const uint64_t old = atomicCAS(
306  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, *key);
307  if (EMPTY_KEY_64 == old) {
308  for (size_t i = 0; i < key_qw_count; ++i) {
309  groups_buffer[off] = key[i];
310  off += entry_count;
311  }
312  return &groups_buffer[off];
313  }
314  }
315  __syncthreads();
316  off = h;
317  for (size_t i = 0; i < key_qw_count; ++i) {
318  if (groups_buffer[off] != key[i]) {
319  return NULL;
320  }
321  off += entry_count;
322  }
323  return &groups_buffer[off];
324 }
325 
326 #include "GroupByRuntime.cpp"
328 #include "MurmurHash.cpp"
329 #include "TopKRuntime.cpp"
330 
331 __device__ int64_t atomicMax64(int64_t* address, int64_t val) {
332  unsigned long long int* address_as_ull = (unsigned long long int*)address;
333  unsigned long long int old = *address_as_ull, assumed;
334 
335  do {
336  assumed = old;
337  old = atomicCAS(address_as_ull, assumed, max((long long)val, (long long)assumed));
338  } while (assumed != old);
339 
340  return old;
341 }
342 
343 __device__ int64_t atomicMin64(int64_t* address, int64_t val) {
344  unsigned long long int* address_as_ull = (unsigned long long int*)address;
345  unsigned long long int old = *address_as_ull, assumed;
346 
347  do {
348  assumed = old;
349  old = atomicCAS(address_as_ull, assumed, min((long long)val, (long long)assumed));
350  } while (assumed != old);
351 
352  return old;
353 }
354 
355 #if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 600)
356 __device__ double atomicAdd(double* address, double val) {
357  unsigned long long int* address_as_ull = (unsigned long long int*)address;
358  unsigned long long int old = *address_as_ull, assumed;
359 
360  do {
361  assumed = old;
362  old = atomicCAS(address_as_ull,
363  assumed,
364  __double_as_longlong(val + __longlong_as_double(assumed)));
365 
366  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
367  } while (assumed != old);
368 
369  return __longlong_as_double(old);
370 }
371 #endif
372 
373 __device__ double atomicMax(double* address, double val) {
374  unsigned long long int* address_as_ull = (unsigned long long int*)address;
375  unsigned long long int old = *address_as_ull, assumed;
376 
377  do {
378  assumed = old;
379  old = atomicCAS(address_as_ull,
380  assumed,
381  __double_as_longlong(max(val, __longlong_as_double(assumed))));
382 
383  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
384  } while (assumed != old);
385 
386  return __longlong_as_double(old);
387 }
388 
389 __device__ float atomicMax(float* address, float val) {
390  int* address_as_int = (int*)address;
391  int old = *address_as_int, assumed;
392 
393  do {
394  assumed = old;
395  old = atomicCAS(
396  address_as_int, assumed, __float_as_int(max(val, __int_as_float(assumed))));
397 
398  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
399  } while (assumed != old);
400 
401  return __int_as_float(old);
402 }
403 
404 __device__ double atomicMin(double* address, double val) {
405  unsigned long long int* address_as_ull = (unsigned long long int*)address;
406  unsigned long long int old = *address_as_ull, assumed;
407 
408  do {
409  assumed = old;
410  old = atomicCAS(address_as_ull,
411  assumed,
412  __double_as_longlong(min(val, __longlong_as_double(assumed))));
413  } while (assumed != old);
414 
415  return __longlong_as_double(old);
416 }
417 
418 __device__ double atomicMin(float* address, float val) {
419  int* address_as_ull = (int*)address;
420  int old = *address_as_ull, assumed;
421 
422  do {
423  assumed = old;
424  old = atomicCAS(
425  address_as_ull, assumed, __float_as_int(min(val, __int_as_float(assumed))));
426  } while (assumed != old);
427 
428  return __int_as_float(old);
429 }
430 
431 extern "C" __device__ uint64_t agg_count_shared(uint64_t* agg, const int64_t val) {
432  return static_cast<uint64_t>(atomicAdd(reinterpret_cast<uint32_t*>(agg), 1UL));
433 }
434 
435 extern "C" __device__ uint32_t agg_count_int32_shared(uint32_t* agg, const int32_t val) {
436  return atomicAdd(agg, 1UL);
437 }
438 
439 extern "C" __device__ uint64_t agg_count_double_shared(uint64_t* agg, const double val) {
440  return agg_count_shared(agg, val);
441 }
442 
443 extern "C" __device__ uint32_t agg_count_float_shared(uint32_t* agg, const float val) {
444  return agg_count_int32_shared(agg, val);
445 }
446 
447 extern "C" __device__ int64_t agg_sum_shared(int64_t* agg, const int64_t val) {
448  return atomicAdd(reinterpret_cast<unsigned long long*>(agg), val);
449 }
450 
451 extern "C" __device__ int32_t agg_sum_int32_shared(int32_t* agg, const int32_t val) {
452  return atomicAdd(agg, val);
453 }
454 
455 extern "C" __device__ void agg_sum_float_shared(int32_t* agg, const float val) {
456  atomicAdd(reinterpret_cast<float*>(agg), val);
457 }
458 
459 extern "C" __device__ void agg_sum_double_shared(int64_t* agg, const double val) {
460  atomicAdd(reinterpret_cast<double*>(agg), val);
461 }
462 
463 extern "C" __device__ void agg_max_shared(int64_t* agg, const int64_t val) {
464  atomicMax64(agg, val);
465 }
466 
467 extern "C" __device__ void agg_max_int32_shared(int32_t* agg, const int32_t val) {
468  atomicMax(agg, val);
469 }
470 
471 extern "C" __device__ void agg_max_double_shared(int64_t* agg, const double val) {
472  atomicMax(reinterpret_cast<double*>(agg), val);
473 }
474 
475 extern "C" __device__ void agg_max_float_shared(int32_t* agg, const float val) {
476  atomicMax(reinterpret_cast<float*>(agg), val);
477 }
478 
479 extern "C" __device__ void agg_min_shared(int64_t* agg, const int64_t val) {
480  atomicMin64(agg, val);
481 }
482 
483 extern "C" __device__ void agg_min_int32_shared(int32_t* agg, const int32_t val) {
484  atomicMin(agg, val);
485 }
486 
487 #if CUDA_VERSION > 10000 && defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
488 __device__ void atomicMax16(int16_t* agg, const int16_t val) {
489  unsigned short int* address_as_us = reinterpret_cast<unsigned short int*>(agg);
490  unsigned short int old = *address_as_us, assumed;
491 
492  do {
493  assumed = old;
494  old = atomicCAS(address_as_us,
495  assumed,
496  static_cast<unsigned short>(max(static_cast<short int>(val),
497  static_cast<short int>(assumed))));
498  } while (assumed != old);
499 }
500 #else
501 __device__ void atomicMax16(int16_t* agg, const int16_t val) {
502  // properly align the input pointer:
503  unsigned int* base_address_u32 =
504  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
505 
506  unsigned int old_value = *base_address_u32;
507  unsigned int swap_value, compare_value;
508  do {
509  compare_value = old_value;
510  swap_value =
511  (reinterpret_cast<size_t>(agg) & 0x2)
512  ? static_cast<unsigned int>(max(static_cast<int16_t>(old_value >> 16), val))
513  << 16 |
514  (old_value & 0xFFFF)
515  : (old_value & 0xFFFF0000) |
516  static_cast<unsigned int>(
517  max(static_cast<int16_t>(old_value & 0xFFFF), val));
518  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
519  } while (old_value != compare_value);
520 }
521 #endif
522 
523 __device__ void atomicMax8(int8_t* agg, const int8_t val) {
524  // properly align the input pointer:
525  unsigned int* base_address_u32 =
526  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
527 
528  // __byte_perm(unsigned int A, unsigned int B, unsigned int s):
529  // if s == 0x3214 returns {A[31..24], A[23..16], A[15..8], B[7..0]}
530  // if s == 0x3240 returns {A[31..24], A[23..16], B[7...0], A[7..0]}
531  // if s == 0x3410 returns {A[31..24], B[7....0], A[15..8], A[7..0]}
532  // if s == 0x4210 returns {B[7....0], A[23..16], A[15..8], A[7..0]}
533  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
534  unsigned int old_value = *base_address_u32;
535  unsigned int swap_value, compare_value;
536  do {
537  compare_value = old_value;
538  auto max_value = static_cast<unsigned int>(
539  // compare val with its corresponding bits in the compare_value
540  max(val,
541  static_cast<int8_t>(__byte_perm(
542  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
543  swap_value = __byte_perm(
544  compare_value, max_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
545  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
546  } while (compare_value != old_value);
547 }
548 
549 #if CUDA_VERSION > 10000 && defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
550 __device__ void atomicMin16(int16_t* agg, const int16_t val) {
551  unsigned short int* address_as_us = reinterpret_cast<unsigned short int*>(agg);
552  unsigned short int old = *address_as_us, assumed;
553 
554  do {
555  assumed = old;
556  old = atomicCAS(address_as_us,
557  assumed,
558  static_cast<unsigned short>(min(static_cast<short int>(val),
559  static_cast<short int>(assumed))));
560  } while (assumed != old);
561 }
562 #else
563 __device__ void atomicMin16(int16_t* agg, const int16_t val) {
564  // properly align the input pointer:
565  unsigned int* base_address_u32 =
566  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
567 
568  unsigned int old_value = *base_address_u32;
569  unsigned int swap_value, compare_value;
570  do {
571  compare_value = old_value;
572  swap_value =
573  (reinterpret_cast<size_t>(agg) & 0x2)
574  ? static_cast<unsigned int>(min(static_cast<int16_t>(old_value >> 16), val))
575  << 16 |
576  (old_value & 0xFFFF)
577  : (old_value & 0xFFFF0000) |
578  static_cast<unsigned int>(
579  min(static_cast<int16_t>(old_value & 0xFFFF), val));
580  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
581  } while (old_value != compare_value);
582 }
583 #endif
584 
585 __device__ void atomicMin16SkipVal(int16_t* agg,
586  const int16_t val,
587  const int16_t skip_val) {
588  // properly align the input pointer:
589  unsigned int* base_address_u32 =
590  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
591 
592  unsigned int old_value = *base_address_u32;
593  unsigned int swap_value, compare_value;
594  do {
595  compare_value = old_value;
596  int16_t selected_old_val = (reinterpret_cast<size_t>(agg) & 0x2)
597  ? static_cast<int16_t>(old_value >> 16)
598  : static_cast<int16_t>(old_value & 0xFFFF);
599 
600  swap_value =
601  (reinterpret_cast<size_t>(agg) & 0x2)
602  ? static_cast<unsigned int>(
603  selected_old_val == skip_val ? val : min(selected_old_val, val))
604  << 16 |
605  (old_value & 0xFFFF)
606  : (old_value & 0xFFFF0000) |
607  static_cast<unsigned int>(
608  selected_old_val == skip_val ? val : min(selected_old_val, val));
609  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
610  } while (old_value != compare_value);
611 }
612 
613 __device__ void atomicMin8(int8_t* agg, const int8_t val) {
614  // properly align the input pointer:
615  unsigned int* base_address_u32 =
616  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
617 
618  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
619  unsigned int old_value = *base_address_u32;
620  unsigned int swap_value, compare_value;
621  do {
622  compare_value = old_value;
623  auto min_value = static_cast<unsigned int>(
624  min(val,
625  static_cast<int8_t>(__byte_perm(
626  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
627  swap_value = __byte_perm(
628  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
629  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
630  } while (compare_value != old_value);
631 }
632 
633 __device__ void atomicMin8SkipVal(int8_t* agg, const int8_t val, const int8_t skip_val) {
634  // properly align the input pointer:
635  unsigned int* base_address_u32 =
636  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
637 
638  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
639  unsigned int old_value = *base_address_u32;
640  unsigned int swap_value, compare_value;
641  do {
642  compare_value = old_value;
643  int8_t selected_old_val = static_cast<int8_t>(
644  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440));
645  auto min_value = static_cast<unsigned int>(
646  selected_old_val == skip_val ? val : min(val, selected_old_val));
647  swap_value = __byte_perm(
648  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
649  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
650  } while (compare_value != old_value);
651 }
652 
653 extern "C" __device__ void agg_max_int16_shared(int16_t* agg, const int16_t val) {
654  return atomicMax16(agg, val);
655 }
656 
657 extern "C" __device__ void agg_max_int8_shared(int8_t* agg, const int8_t val) {
658  return atomicMax8(agg, val);
659 }
660 
661 extern "C" __device__ void agg_min_int16_shared(int16_t* agg, const int16_t val) {
662  return atomicMin16(agg, val);
663 }
664 
665 extern "C" __device__ void agg_min_int8_shared(int8_t* agg, const int8_t val) {
666  return atomicMin8(agg, val);
667 }
668 
669 extern "C" __device__ void agg_min_double_shared(int64_t* agg, const double val) {
670  atomicMin(reinterpret_cast<double*>(agg), val);
671 }
672 
673 extern "C" __device__ void agg_min_float_shared(int32_t* agg, const float val) {
674  atomicMin(reinterpret_cast<float*>(agg), val);
675 }
676 
677 extern "C" __device__ void agg_id_shared(int64_t* agg, const int64_t val) {
678  *agg = val;
679 }
680 
681 extern "C" __device__ int32_t checked_single_agg_id_shared(int64_t* agg,
682  const int64_t val,
683  const int64_t null_val) {
684  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
685  unsigned long long int old = *address_as_ull, assumed;
686 
687  if (val == null_val) {
688  return 0;
689  }
690 
691  do {
692  if (static_cast<int64_t>(old) != null_val) {
693  if (static_cast<int64_t>(old) != val) {
694  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
695  return 15;
696  } else {
697  break;
698  }
699  }
700 
701  assumed = old;
702  old = atomicCAS(address_as_ull, assumed, val);
703  } while (assumed != old);
704 
705  return 0;
706 }
707 
708 #define DEF_AGG_ID_INT_SHARED(n) \
709  extern "C" __device__ void agg_id_int##n##_shared(int##n##_t* agg, \
710  const int##n##_t val) { \
711  *agg = val; \
712  }
713 
717 
718 #undef DEF_AGG_ID_INT_SHARED
719 
720 extern "C" __device__ void agg_id_double_shared(int64_t* agg, const double val) {
721  *agg = *(reinterpret_cast<const int64_t*>(&val));
722 }
723 
724 extern "C" __device__ int32_t checked_single_agg_id_double_shared(int64_t* agg,
725  const double val,
726  const double null_val) {
727  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
728  unsigned long long int old = *address_as_ull, assumed;
729 
730  if (val == null_val) {
731  return 0;
732  }
733 
734  do {
735  if (static_cast<int64_t>(old) != __double_as_longlong(null_val)) {
736  if (static_cast<int64_t>(old) != __double_as_longlong(val)) {
737  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
738  return 15;
739  } else {
740  break;
741  }
742  }
743 
744  assumed = old;
745  old = atomicCAS(address_as_ull, assumed, __double_as_longlong(val));
746  } while (assumed != old);
747 
748  return 0;
749 }
750 
751 extern "C" __device__ void agg_id_double_shared_slow(int64_t* agg, const double* val) {
752  *agg = *(reinterpret_cast<const int64_t*>(val));
753 }
754 
755 extern "C" __device__ int32_t
757  const double* valp,
758  const double null_val) {
759  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
760  unsigned long long int old = *address_as_ull, assumed;
761  double val = *valp;
762 
763  if (val == null_val) {
764  return 0;
765  }
766 
767  do {
768  if (static_cast<int64_t>(old) != __double_as_longlong(null_val)) {
769  if (static_cast<int64_t>(old) != __double_as_longlong(val)) {
770  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
771  return 15;
772  } else {
773  break;
774  }
775  }
776 
777  assumed = old;
778  old = atomicCAS(address_as_ull, assumed, __double_as_longlong(val));
779  } while (assumed != old);
780 
781  return 0;
782 }
783 
784 extern "C" __device__ void agg_id_float_shared(int32_t* agg, const float val) {
785  *agg = __float_as_int(val);
786 }
787 
788 extern "C" __device__ int32_t checked_single_agg_id_float_shared(int32_t* agg,
789  const float val,
790  const float null_val) {
791  int* address_as_ull = reinterpret_cast<int*>(agg);
792  int old = *address_as_ull, assumed;
793 
794  if (val == null_val) {
795  return 0;
796  }
797 
798  do {
799  if (old != __float_as_int(null_val)) {
800  if (old != __float_as_int(val)) {
801  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
802  return 15;
803  } else {
804  break;
805  }
806  }
807 
808  assumed = old;
809  old = atomicCAS(address_as_ull, assumed, __float_as_int(val));
810  } while (assumed != old);
811 
812  return 0;
813 }
814 
815 #define DEF_SKIP_AGG(base_agg_func) \
816  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
817  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
818  if (val != skip_val) { \
819  return base_agg_func##_shared(agg, val); \
820  } \
821  return 0; \
822  }
823 
824 #define DATA_T int64_t
825 #define ADDR_T uint64_t
827 #undef DATA_T
828 #undef ADDR_T
829 
830 #define DATA_T int32_t
831 #define ADDR_T uint32_t
833 #undef DATA_T
834 #undef ADDR_T
835 
836 // Initial value for nullable column is INT32_MIN
837 extern "C" __device__ void agg_max_int32_skip_val_shared(int32_t* agg,
838  const int32_t val,
839  const int32_t skip_val) {
840  if (val != skip_val) {
841  agg_max_int32_shared(agg, val);
842  }
843 }
844 
845 extern "C" __device__ void agg_max_int16_skip_val_shared(int16_t* agg,
846  const int16_t val,
847  const int16_t skip_val) {
848  if (val != skip_val) {
849  agg_max_int16_shared(agg, val);
850  }
851 }
852 
853 extern "C" __device__ void agg_min_int16_skip_val_shared(int16_t* agg,
854  const int16_t val,
855  const int16_t skip_val) {
856  if (val != skip_val) {
857  atomicMin16SkipVal(agg, val, skip_val);
858  }
859 }
860 
861 extern "C" __device__ void agg_max_int8_skip_val_shared(int8_t* agg,
862  const int8_t val,
863  const int8_t skip_val) {
864  if (val != skip_val) {
865  agg_max_int8_shared(agg, val);
866  }
867 }
868 
869 extern "C" __device__ void agg_min_int8_skip_val_shared(int8_t* agg,
870  const int8_t val,
871  const int8_t skip_val) {
872  if (val != skip_val) {
873  atomicMin8SkipVal(agg, val, skip_val);
874  }
875 }
876 
877 __device__ int32_t atomicMin32SkipVal(int32_t* address,
878  int32_t val,
879  const int32_t skip_val) {
880  int32_t old = atomicExch(address, INT_MAX);
881  return atomicMin(address, old == skip_val ? val : min(old, val));
882 }
883 
884 extern "C" __device__ void agg_min_int32_skip_val_shared(int32_t* agg,
885  const int32_t val,
886  const int32_t skip_val) {
887  if (val != skip_val) {
888  atomicMin32SkipVal(agg, val, skip_val);
889  }
890 }
891 
892 __device__ int32_t atomicSum32SkipVal(int32_t* address,
893  const int32_t val,
894  const int32_t skip_val) {
895  unsigned int* address_as_int = (unsigned int*)address;
896  int32_t old = atomicExch(address_as_int, 0);
897  int32_t old2 = atomicAdd(address_as_int, old == skip_val ? val : (val + old));
898  return old == skip_val ? old2 : (old2 + old);
899 }
900 
901 extern "C" __device__ int32_t agg_sum_int32_skip_val_shared(int32_t* agg,
902  const int32_t val,
903  const int32_t skip_val) {
904  if (val != skip_val) {
905  const int32_t old = atomicSum32SkipVal(agg, val, skip_val);
906  return old;
907  }
908  return 0;
909 }
910 
911 __device__ int64_t atomicSum64SkipVal(int64_t* address,
912  const int64_t val,
913  const int64_t skip_val) {
914  unsigned long long int* address_as_ull = (unsigned long long int*)address;
915  int64_t old = atomicExch(address_as_ull, 0);
916  int64_t old2 = atomicAdd(address_as_ull, old == skip_val ? val : (val + old));
917  return old == skip_val ? old2 : (old2 + old);
918 }
919 
920 extern "C" __device__ int64_t agg_sum_skip_val_shared(int64_t* agg,
921  const int64_t val,
922  const int64_t skip_val) {
923  if (val != skip_val) {
924  return atomicSum64SkipVal(agg, val, skip_val);
925  }
926  return 0;
927 }
928 
929 __device__ int64_t atomicMin64SkipVal(int64_t* address,
930  int64_t val,
931  const int64_t skip_val) {
932  unsigned long long int* address_as_ull =
933  reinterpret_cast<unsigned long long int*>(address);
934  unsigned long long int old = *address_as_ull, assumed;
935 
936  do {
937  assumed = old;
938  old = atomicCAS(address_as_ull,
939  assumed,
940  assumed == skip_val ? val : min((long long)val, (long long)assumed));
941  } while (assumed != old);
942 
943  return old;
944 }
945 
946 extern "C" __device__ void agg_min_skip_val_shared(int64_t* agg,
947  const int64_t val,
948  const int64_t skip_val) {
949  if (val != skip_val) {
950  atomicMin64SkipVal(agg, val, skip_val);
951  }
952 }
953 
954 __device__ int64_t atomicMax64SkipVal(int64_t* address,
955  int64_t val,
956  const int64_t skip_val) {
957  unsigned long long int* address_as_ull =
958  reinterpret_cast<unsigned long long int*>(address);
959  unsigned long long int old = *address_as_ull, assumed;
960 
961  do {
962  assumed = old;
963  old = atomicCAS(address_as_ull,
964  assumed,
965  assumed == skip_val ? val : max((long long)val, (long long)assumed));
966  } while (assumed != old);
967 
968  return old;
969 }
970 
971 extern "C" __device__ void agg_max_skip_val_shared(int64_t* agg,
972  const int64_t val,
973  const int64_t skip_val) {
974  if (val != skip_val) {
975  atomicMax64SkipVal(agg, val, skip_val);
976  }
977 }
978 
979 #undef DEF_SKIP_AGG
980 #define DEF_SKIP_AGG(base_agg_func) \
981  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
982  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
983  if (val != skip_val) { \
984  return base_agg_func##_shared(agg, val); \
985  } \
986  return *agg; \
987  }
988 
989 #define DATA_T double
990 #define ADDR_T uint64_t
992 #undef ADDR_T
993 #undef DATA_T
994 
995 #define DATA_T float
996 #define ADDR_T uint32_t
998 #undef ADDR_T
999 #undef DATA_T
1000 
1001 // Initial value for nullable column is FLOAT_MIN
1002 extern "C" __device__ void agg_max_float_skip_val_shared(int32_t* agg,
1003  const float val,
1004  const float skip_val) {
1005  if (__float_as_int(val) != __float_as_int(skip_val)) {
1006  float old = atomicExch(reinterpret_cast<float*>(agg), -FLT_MAX);
1007  atomicMax(reinterpret_cast<float*>(agg),
1008  __float_as_int(old) == __float_as_int(skip_val) ? val : fmaxf(old, val));
1009  }
1010 }
1011 
1012 __device__ float atomicMinFltSkipVal(int32_t* address, float val, const float skip_val) {
1013  float old = atomicExch(reinterpret_cast<float*>(address), FLT_MAX);
1014  return atomicMin(
1015  reinterpret_cast<float*>(address),
1016  __float_as_int(old) == __float_as_int(skip_val) ? val : fminf(old, val));
1017 }
1018 
1019 extern "C" __device__ void agg_min_float_skip_val_shared(int32_t* agg,
1020  const float val,
1021  const float skip_val) {
1022  if (__float_as_int(val) != __float_as_int(skip_val)) {
1023  atomicMinFltSkipVal(agg, val, skip_val);
1024  }
1025 }
1026 
1027 __device__ void atomicSumFltSkipVal(float* address,
1028  const float val,
1029  const float skip_val) {
1030  float old = atomicExch(address, 0.f);
1031  atomicAdd(address, __float_as_int(old) == __float_as_int(skip_val) ? val : (val + old));
1032 }
1033 
1034 extern "C" __device__ void agg_sum_float_skip_val_shared(int32_t* agg,
1035  const float val,
1036  const float skip_val) {
1037  if (__float_as_int(val) != __float_as_int(skip_val)) {
1038  atomicSumFltSkipVal(reinterpret_cast<float*>(agg), val, skip_val);
1039  }
1040 }
1041 
1042 __device__ void atomicSumDblSkipVal(double* address,
1043  const double val,
1044  const double skip_val) {
1045  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1046  double old = __longlong_as_double(atomicExch(address_as_ull, __double_as_longlong(0.)));
1047  atomicAdd(
1048  address,
1049  __double_as_longlong(old) == __double_as_longlong(skip_val) ? val : (val + old));
1050 }
1051 
1052 extern "C" __device__ void agg_sum_double_skip_val_shared(int64_t* agg,
1053  const double val,
1054  const double skip_val) {
1055  if (__double_as_longlong(val) != __double_as_longlong(skip_val)) {
1056  atomicSumDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1057  }
1058 }
1059 
1060 __device__ double atomicMinDblSkipVal(double* address,
1061  double val,
1062  const double skip_val) {
1063  unsigned long long int* address_as_ull =
1064  reinterpret_cast<unsigned long long int*>(address);
1065  unsigned long long int old = *address_as_ull;
1066  unsigned long long int skip_val_as_ull =
1067  *reinterpret_cast<const unsigned long long*>(&skip_val);
1068  unsigned long long int assumed;
1069 
1070  do {
1071  assumed = old;
1072  old = atomicCAS(address_as_ull,
1073  assumed,
1074  assumed == skip_val_as_ull
1075  ? *reinterpret_cast<unsigned long long*>(&val)
1076  : __double_as_longlong(min(val, __longlong_as_double(assumed))));
1077  } while (assumed != old);
1078 
1079  return __longlong_as_double(old);
1080 }
1081 
1082 extern "C" __device__ void agg_min_double_skip_val_shared(int64_t* agg,
1083  const double val,
1084  const double skip_val) {
1085  if (val != skip_val) {
1086  atomicMinDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1087  }
1088 }
1089 
1090 extern "C" __device__ void agg_max_double_skip_val_shared(int64_t* agg,
1091  const double val,
1092  const double skip_val) {
1093  if (__double_as_longlong(val) != __double_as_longlong(skip_val)) {
1094  double old = __longlong_as_double(atomicExch(
1095  reinterpret_cast<unsigned long long int*>(agg), __double_as_longlong(-DBL_MAX)));
1096  atomicMax(reinterpret_cast<double*>(agg),
1097  __double_as_longlong(old) == __double_as_longlong(skip_val)
1098  ? val
1099  : fmax(old, val));
1100  }
1101 }
1102 
1103 #undef DEF_SKIP_AGG
1104 
1105 extern "C" __device__ bool slotEmptyKeyCAS(int64_t* slot,
1106  int64_t new_val,
1107  int64_t init_val) {
1108  auto slot_address = reinterpret_cast<unsigned long long int*>(slot);
1109  const auto empty_key =
1110  static_cast<unsigned long long int*>(static_cast<void*>(&init_val));
1111  const auto new_val_cast =
1112  static_cast<unsigned long long int*>(static_cast<void*>(&new_val));
1113 
1114  const auto old_val = atomicCAS(slot_address, *empty_key, *new_val_cast);
1115  if (old_val == *empty_key) {
1116  return true;
1117  } else {
1118  return false;
1119  }
1120 }
1121 
1122 extern "C" __device__ bool slotEmptyKeyCAS_int32(int32_t* slot,
1123  int32_t new_val,
1124  int32_t init_val) {
1125  unsigned int* slot_address = reinterpret_cast<unsigned int*>(slot);
1126  unsigned int compare_value = static_cast<unsigned int>(init_val);
1127  unsigned int swap_value = static_cast<unsigned int>(new_val);
1128 
1129  const unsigned int old_value = atomicCAS(slot_address, compare_value, swap_value);
1130  return old_value == compare_value;
1131 }
1132 
1133 extern "C" __device__ bool slotEmptyKeyCAS_int16(int16_t* slot,
1134  int16_t new_val,
1135  int16_t init_val) {
1136  unsigned int* base_slot_address =
1137  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1138  unsigned int old_value = *base_slot_address;
1139  unsigned int swap_value, compare_value;
1140  do {
1141  compare_value = old_value;
1142  // exit criteria: if init_val does not exist in the slot (some other thread has
1143  // succeeded)
1144  if (static_cast<unsigned int>(init_val) !=
1145  __byte_perm(
1146  compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x2 ? 0x3244 : 0x4410))) {
1147  return false;
1148  }
1149  swap_value = __byte_perm(compare_value,
1150  static_cast<unsigned int>(new_val),
1151  (reinterpret_cast<size_t>(slot) & 0x2) ? 0x5410 : 0x3254);
1152  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1153  } while (compare_value != old_value);
1154  return true;
1155 }
1156 
1157 extern "C" __device__ bool slotEmptyKeyCAS_int8(int8_t* slot,
1158  int8_t new_val,
1159  int8_t init_val) {
1160  // properly align the slot address:
1161  unsigned int* base_slot_address =
1162  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1163  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
1164  unsigned int old_value = *base_slot_address;
1165  unsigned int swap_value, compare_value;
1166  do {
1167  compare_value = old_value;
1168  // exit criteria: if init_val does not exist in the slot (some other thread has
1169  // succeeded)
1170  if (static_cast<unsigned int>(init_val) !=
1171  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x3) | 0x4440)) {
1172  return false;
1173  }
1174  swap_value = __byte_perm(compare_value,
1175  static_cast<unsigned int>(new_val),
1176  byte_permutations[reinterpret_cast<size_t>(slot) & 0x3]);
1177  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1178  } while (compare_value != old_value);
1179  return true;
1180 }
1181 
1182 #include "../Utils/ChunkIter.cpp"
1183 #include "DateTruncate.cpp"
1184 #include "ExtractFromTime.cpp"
1185 #define EXECUTE_INCLUDE
1186 #include "ArrayOps.cpp"
1187 #include "DateAdd.cpp"
1188 #include "GeoOps.cpp"
1189 #include "StringFunctions.cpp"
1190 #undef EXECUTE_INCLUDE
1191 #include "../Utils/Regexp.cpp"
1192 #include "../Utils/StringLike.cpp"
1193 
1194 extern "C" __device__ uint64_t string_decode(int8_t* chunk_iter_, int64_t pos) {
1195  // TODO(alex): de-dup, the x64 version is basically identical
1196  ChunkIter* chunk_iter = reinterpret_cast<ChunkIter*>(chunk_iter_);
1197  VarlenDatum vd;
1198  bool is_end;
1199  ChunkIter_get_nth(chunk_iter, pos, false, &vd, &is_end);
1200  return vd.is_null ? 0
1201  : (reinterpret_cast<uint64_t>(vd.pointer) & 0xffffffffffff) |
1202  (static_cast<uint64_t>(vd.length) << 48);
1203 }
1204 
1205 extern "C" __device__ void linear_probabilistic_count(uint8_t* bitmap,
1206  const uint32_t bitmap_bytes,
1207  const uint8_t* key_bytes,
1208  const uint32_t key_len) {
1209  const uint32_t bit_pos = MurmurHash1(key_bytes, key_len, 0) % (bitmap_bytes * 8);
1210  const uint32_t word_idx = bit_pos / 32;
1211  const uint32_t bit_idx = bit_pos % 32;
1212  atomicOr(((uint32_t*)bitmap) + word_idx, 1 << bit_idx);
1213 }
1214 
1215 extern "C" __device__ void agg_count_distinct_bitmap_gpu(int64_t* agg,
1216  const int64_t val,
1217  const int64_t min_val,
1218  const int64_t base_dev_addr,
1219  const int64_t base_host_addr,
1220  const uint64_t sub_bitmap_count,
1221  const uint64_t bitmap_bytes) {
1222  const uint64_t bitmap_idx = val - min_val;
1223  const uint32_t byte_idx = bitmap_idx >> 3;
1224  const uint32_t word_idx = byte_idx >> 2;
1225  const uint32_t byte_word_idx = byte_idx & 3;
1226  const int64_t host_addr = *agg;
1227  uint32_t* bitmap = (uint32_t*)(base_dev_addr + host_addr - base_host_addr +
1228  (threadIdx.x & (sub_bitmap_count - 1)) * bitmap_bytes);
1229  switch (byte_word_idx) {
1230  case 0:
1231  atomicOr(&bitmap[word_idx], 1 << (bitmap_idx & 7));
1232  break;
1233  case 1:
1234  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 8));
1235  break;
1236  case 2:
1237  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 16));
1238  break;
1239  case 3:
1240  atomicOr(&bitmap[word_idx], 1 << ((bitmap_idx & 7) + 24));
1241  break;
1242  default:
1243  break;
1244  }
1245 }
1246 
1247 extern "C" __device__ void agg_count_distinct_bitmap_skip_val_gpu(
1248  int64_t* agg,
1249  const int64_t val,
1250  const int64_t min_val,
1251  const int64_t skip_val,
1252  const int64_t base_dev_addr,
1253  const int64_t base_host_addr,
1254  const uint64_t sub_bitmap_count,
1255  const uint64_t bitmap_bytes) {
1256  if (val != skip_val) {
1258  agg, val, min_val, base_dev_addr, base_host_addr, sub_bitmap_count, bitmap_bytes);
1259  }
1260 }
1261 
1262 extern "C" __device__ void agg_approximate_count_distinct_gpu(
1263  int64_t* agg,
1264  const int64_t key,
1265  const uint32_t b,
1266  const int64_t base_dev_addr,
1267  const int64_t base_host_addr) {
1268  const uint64_t hash = MurmurHash64A(&key, sizeof(key), 0);
1269  const uint32_t index = hash >> (64 - b);
1270  const int32_t rank = get_rank(hash << b, 64 - b);
1271  const int64_t host_addr = *agg;
1272  int32_t* M = (int32_t*)(base_dev_addr + host_addr - base_host_addr);
1273  atomicMax(&M[index], rank);
1274 }
1275 
1276 extern "C" __device__ void force_sync() {
1277  __threadfence_block();
1278 }
1279 
1280 extern "C" __device__ void sync_warp() {
1281  __syncwarp();
1282 }
1283 
1291 extern "C" __device__ void sync_warp_protected(int64_t thread_pos, int64_t row_count) {
1292  // only syncing if NOT within the same warp as those threads experiencing the critical
1293  // edge
1294  if ((((row_count - 1) | 0x1F) - thread_pos) >= 32) {
1295  __syncwarp();
1296  }
1297 }
1298 
1299 extern "C" __device__ void sync_threadblock() {
1300  __syncthreads();
1301 }
1302 
1303 /*
1304  * Currently, we just use this function for handling non-grouped aggregates
1305  * with COUNT queries (with GPU shared memory used). Later, we should generate code for
1306  * this depending on the type of aggregate functions.
1307  * TODO: we should use one contiguous global memory buffer, rather than current default
1308  * behaviour of multiple buffers, each for one aggregate. Once that's resolved, we can
1309  * do much cleaner than this function
1310  */
1311 extern "C" __device__ void write_back_non_grouped_agg(int64_t* input_buffer,
1312  int64_t* output_buffer,
1313  const int32_t agg_idx) {
1314  if (threadIdx.x == agg_idx) {
1315  agg_sum_shared(output_buffer, input_buffer[agg_idx]);
1316  }
1317 }
__device__ void sync_warp_protected(int64_t thread_pos, int64_t row_count)
__device__ int32_t checked_single_agg_id_double_shared_slow(int64_t *agg, const double *valp, const double null_val)
__device__ void agg_max_float_shared(int32_t *agg, const float val)
__device__ uint32_t agg_count_float_shared(uint32_t *agg, const float val)
__device__ void agg_count_distinct_bitmap_skip_val_gpu(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t skip_val, const int64_t base_dev_addr, const int64_t base_host_addr, const uint64_t sub_bitmap_count, const uint64_t bitmap_bytes)
__device__ bool dynamic_watchdog()
__device__ int64_t * get_matching_group_value_columnar(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
ALWAYS_INLINE uint32_t agg_count_float(uint32_t *agg, const float val)
__device__ void agg_max_shared(int64_t *agg, const int64_t val)
#define EMPTY_KEY_64
__device__ void write_back_nop(int64_t *dest, int64_t *src, const int32_t sz)
Definition: cuda_mapd_rt.cu:50
__device__ void agg_sum_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
ALWAYS_INLINE uint64_t agg_count(uint64_t *agg, const int64_t)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
__device__ void agg_min_int32_shared(int32_t *agg, const int32_t val)
__device__ int8_t thread_warp_idx(const int8_t warp_sz)
Definition: cuda_mapd_rt.cu:40
__device__ int64_t dw_sm_cycle_start[128]
Definition: cuda_mapd_rt.cu:92
bool is_null
Definition: sqltypes.h:147
__device__ void agg_id_float_shared(int32_t *agg, const float val)
__device__ void agg_min_double_shared(int64_t *agg, const double val)
__device__ int64_t get_thread_index()
Definition: cuda_mapd_rt.cu:20
RUNTIME_EXPORT NEVER_INLINE DEVICE uint64_t MurmurHash64A(const void *key, int len, uint64_t seed)
Definition: MurmurHash.cpp:26
__device__ int32_t atomicMin32SkipVal(int32_t *address, int32_t val, const int32_t skip_val)
__device__ int32_t pos_step_impl()
Definition: cuda_mapd_rt.cu:36
__device__ void write_back_non_grouped_agg(int64_t *input_buffer, int64_t *output_buffer, const int32_t agg_idx)
__device__ void agg_min_int8_shared(int8_t *agg, const int8_t val)
__device__ int32_t checked_single_agg_id_double_shared(int64_t *agg, const double val, const double null_val)
__device__ float atomicMinFltSkipVal(int32_t *address, float val, const float skip_val)
__device__ const int64_t * init_shared_mem_nop(const int64_t *groups_buffer, const int32_t groups_buffer_size)
Definition: cuda_mapd_rt.cu:44
__device__ double atomicMin(double *address, double val)
__device__ void agg_max_int8_shared(int8_t *agg, const int8_t val)
__device__ int32_t checked_single_agg_id_float_shared(int32_t *agg, const float val, const float null_val)
__device__ void atomicMin8SkipVal(int8_t *agg, const int8_t val, const int8_t skip_val)
Functions to support geospatial operations used by the executor.
__device__ int64_t * get_matching_group_value(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const uint32_t row_size_quad)
__device__ uint32_t agg_count_int32_shared(uint32_t *agg, const int32_t val)
__device__ int64_t dw_cycle_budget
Definition: cuda_mapd_rt.cu:94
__device__ int64_t agg_sum_shared(int64_t *agg, const int64_t val)
__device__ void agg_id_double_shared_slow(int64_t *agg, const double *val)
ALWAYS_INLINE uint32_t agg_count_int32(uint32_t *agg, const int32_t)
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
__device__ void agg_min_float_shared(int32_t *agg, const float val)
__device__ int64_t atomicMin64(int64_t *address, int64_t val)
__device__ int64_t * declare_dynamic_shared_memory()
Definition: cuda_mapd_rt.cu:57
__device__ void agg_max_double_shared(int64_t *agg, const double val)
__device__ void atomicSumDblSkipVal(double *address, const double val, const double skip_val)
int8_t * pointer
Definition: sqltypes.h:146
__device__ int32_t agg_sum_int32_shared(int32_t *agg, const int32_t val)
__device__ int64_t agg_sum_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ void agg_sum_float_shared(int32_t *agg, const float val)
__device__ void agg_id_double_shared(int64_t *agg, const double val)
__device__ void agg_max_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ void atomicMax16(int16_t *agg, const int16_t val)
#define DEF_SKIP_AGG(base_agg_func)
__device__ int64_t get_block_index()
Definition: cuda_mapd_rt.cu:24
__device__ void agg_min_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
__device__ bool check_interrupt()
__device__ bool slotEmptyKeyCAS_int32(int32_t *slot, int32_t new_val, int32_t init_val)
__device__ int64_t atomicSum64SkipVal(int64_t *address, const int64_t val, const int64_t skip_val)
__device__ int32_t agg_sum_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ void agg_min_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ void linear_probabilistic_count(uint8_t *bitmap, const uint32_t bitmap_bytes, const uint8_t *key_bytes, const uint32_t key_len)
__device__ void agg_count_distinct_bitmap_gpu(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t base_dev_addr, const int64_t base_host_addr, const uint64_t sub_bitmap_count, const uint64_t bitmap_bytes)
__device__ void atomicSumFltSkipVal(float *address, const float val, const float skip_val)
__device__ void agg_sum_double_shared(int64_t *agg, const double val)
__inline__ __device__ uint32_t get_smid(void)
Definition: cuda_mapd_rt.cu:98
__device__ void agg_min_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ uint64_t agg_count_shared(uint64_t *agg, const int64_t val)
__device__ int64_t atomicMax64(int64_t *address, int64_t val)
__device__ bool slotEmptyKeyCAS(int64_t *slot, int64_t new_val, int64_t init_val)
__device__ int32_t pos_start_impl(const int32_t *row_index_resume)
Definition: cuda_mapd_rt.cu:28
__device__ int64_t atomicMax64SkipVal(int64_t *address, int64_t val, const int64_t skip_val)
__device__ void atomicMin16(int16_t *agg, const int16_t val)
__device__ void agg_max_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
ALWAYS_INLINE uint64_t agg_count_double(uint64_t *agg, const double val)
__device__ int32_t runtime_interrupt_flag
Definition: cuda_mapd_rt.cu:96
__device__ void agg_approximate_count_distinct_gpu(int64_t *agg, const int64_t key, const uint32_t b, const int64_t base_dev_addr, const int64_t base_host_addr)
__device__ void sync_warp()
__device__ void atomicMin16SkipVal(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void agg_sum_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
__device__ void agg_max_int8_skip_val_shared(int8_t *agg, const int8_t val, const int8_t skip_val)
RUNTIME_EXPORT NEVER_INLINE DEVICE uint32_t MurmurHash1(const void *key, int len, const uint32_t seed)
Definition: MurmurHash.cpp:20
__device__ void agg_max_int16_skip_val_shared(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void atomicMin8(int8_t *agg, const int8_t val)
__device__ void agg_min_int16_shared(int16_t *agg, const int16_t val)
__device__ void agg_max_int16_shared(int16_t *agg, const int16_t val)
__device__ const int64_t * init_shared_mem(const int64_t *global_groups_buffer, const int32_t groups_buffer_size)
Definition: cuda_mapd_rt.cu:67
__device__ void agg_min_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
#define DEF_AGG_ID_INT_SHARED(n)
__device__ uint64_t agg_count_double_shared(uint64_t *agg, const double val)
#define EMPTY_KEY_32
__device__ T get_empty_key()
__device__ void agg_min_int16_skip_val_shared(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void sync_threadblock()
char * f
__device__ void agg_min_int8_skip_val_shared(int8_t *agg, const int8_t val, const int8_t skip_val)
__device__ void atomicMax8(int8_t *agg, const int8_t val)
__device__ void agg_id_shared(int64_t *agg, const int64_t val)
__device__ double atomicMax(double *address, double val)
__device__ uint64_t string_decode(int8_t *chunk_iter_, int64_t pos)
__device__ int32_t atomicSum32SkipVal(int32_t *address, const int32_t val, const int32_t skip_val)
__device__ double atomicMinDblSkipVal(double *address, double val, const double skip_val)
__device__ int32_t get_matching_group_value_columnar_slot(int64_t *groups_buffer, const uint32_t entry_count, const uint32_t h, const T *key, const uint32_t key_count)
__device__ void agg_max_int32_shared(int32_t *agg, const int32_t val)
__device__ int32_t checked_single_agg_id_shared(int64_t *agg, const int64_t val, const int64_t null_val)
__device__ void agg_max_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ int32_t dw_abort
Definition: cuda_mapd_rt.cu:95
__device__ bool slotEmptyKeyCAS_int16(int16_t *slot, int16_t new_val, int16_t init_val)
__device__ void agg_max_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
__device__ int64_t atomicMin64SkipVal(int64_t *address, int64_t val, const int64_t skip_val)
Functions to support array operations used by the executor.
__device__ void force_sync()
__device__ void agg_min_shared(int64_t *agg, const int64_t val)
size_t length
Definition: sqltypes.h:145
__device__ bool slotEmptyKeyCAS_int8(int8_t *slot, int8_t new_val, int8_t init_val)
__device__ int32_t group_buff_idx_impl()
Definition: cuda_mapd_rt.cu:32