OmniSciDB  5ade3759e0
ProfileUtils.cu
Go to the documentation of this file.
1 /**
2  * @file ProfileUtils.cu
3  * @author Minggang Yu <miyu@mapd.com>
4  * @brief Unit tests for microbenchmark.
5  *
6  * Copyright (c) 2016 MapD Technologies, Inc. All rights reserved.
7  */
8 #include "ProfileTest.h"
9 
10 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
11 #include <stdio.h>
12 #include <thrust/device_ptr.h>
13 #include <thrust/device_vector.h>
14 #include <thrust/execution_policy.h>
15 #include <thrust/random.h>
16 
17 #ifdef __clang__
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wunused-function"
20 #else
21 #pragma GCC diagnostic push
22 #pragma GCC diagnostic ignored "-Wunused-function"
23 #endif
24 #include "../QueryEngine/cuda_mapd_rt.cu"
25 #ifdef __clang__
26 #pragma clang diagnostic pop
27 #else
28 #pragma GCC diagnostic pop
29 #endif
30 
31 namespace {
32 // Number of threads to put in a thread block.
33 const unsigned c_block_size = 512;
34 
35 // Number of blocks to put along each axis of the grid.
36 const unsigned c_grid_size = 16384;
37 
38 dim3 compute_grid_dim(unsigned n) {
39  dim3 grid((n + c_block_size - 1) / c_block_size);
40  if (grid.x > c_grid_size) {
41  grid.y = (grid.x + c_grid_size - 1) / c_grid_size;
42  grid.x = c_grid_size;
43  }
44  return grid;
45 }
46 
47 template <typename KeyT = int64_t>
48 __device__ inline bool is_empty_slot(const KeyT k) {
49  static_assert(thrust::detail::is_same<KeyT, int64_t>::value,
50  "Unsupported template parameter other than int64_t for now");
51  return k == EMPTY_KEY_64;
52 }
53 
54 template <typename KeyT = int64_t>
55 __device__ inline void reset_entry(KeyT* entry_ptr) {
56  static_assert(thrust::detail::is_same<KeyT, int64_t>::value,
57  "Unsupported template parameter other than int64_t for now");
58  *entry_ptr = static_cast<KeyT>(EMPTY_KEY_64);
59 }
60 
61 template <typename T = int64_t>
62 struct DeviceIntGenerator {
63  static_assert(thrust::detail::is_integral<T>::value, "Template type is not integral");
64  DeviceIntGenerator(int8_t* ptr, size_t gap, T min, T max, T seed = 0)
65  : buff_ptr(ptr), stride(gap), engine(seed), uni_dist(min, max) {}
66 
67  __device__ void operator()(const int index) {
68  engine.discard(index);
69  *reinterpret_cast<T*>(buff_ptr + index * stride) = uni_dist(engine);
70  }
71 
72  int8_t* buff_ptr;
73  size_t stride;
74  thrust::default_random_engine engine;
75  thrust::uniform_int_distribution<T> uni_dist;
76 };
77 
78 template <typename T = int64_t>
79 bool generate_numbers(int8_t* random_numbers,
80  const size_t num_random_numbers,
81  const T min_number,
82  const T max_number,
83  const DIST_KIND dist,
84  const size_t stride = 1) {
85  if (dist != DIST_KIND::UNI) {
86  return false;
87  }
88  static T seed = 0;
89  thrust::for_each(
90  thrust::make_counting_iterator(size_t(0)),
91  thrust::make_counting_iterator(num_random_numbers),
92  DeviceIntGenerator<T>(random_numbers, stride, min_number, max_number, seed++));
93  return true;
94 }
95 
96 } // namespace
97 
98 bool generate_columns_on_device(int8_t* buffers,
99  const size_t row_count,
100  const size_t col_count,
101  const std::vector<size_t>& col_widths,
102  const std::vector<std::pair<int64_t, int64_t>>& ranges,
103  const bool is_columnar,
104  const std::vector<DIST_KIND>& dists) {
105  if (buffers == nullptr) {
106  return false;
107  }
108  CHECK_EQ(col_widths.size(), col_count);
109  CHECK_EQ(ranges.size(), col_count);
110  size_t row_size = 0;
111  for (auto& wid : col_widths) {
112  row_size += wid;
113  }
114  for (size_t i = 0; i < col_count;
115  buffers += (is_columnar ? row_count : 1) * col_widths[i++]) {
116  if (dists[i] == DIST_KIND::INVALID) {
117  continue;
118  }
119  CHECK(ranges[i].first <= ranges[i].second);
120  switch (col_widths[i]) {
121  case 4:
122  if (!generate_numbers(buffers,
123  row_count,
124  static_cast<int32_t>(ranges[i].first),
125  static_cast<int32_t>(ranges[i].second),
126  dists[i],
127  (is_columnar ? 4 : row_size))) {
128  return false;
129  }
130  break;
131  case 8:
132  if (!generate_numbers(buffers,
133  row_count,
134  ranges[i].first,
135  ranges[i].second,
136  dists[i],
137  (is_columnar ? 8 : row_size))) {
138  return false;
139  }
140  break;
141  default:
142  CHECK(false);
143  }
144  }
145  return true;
146 }
147 
148 namespace {
149 
150 template <bool isColumnar = true>
151 __global__ void init_group(int8_t* groups,
152  const size_t group_count,
153  const size_t col_count,
154  const size_t* col_widths,
155  const size_t* init_vals) {
156  const auto thread_index =
157  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
158  if (thread_index >= group_count) {
159  return;
160  }
161  for (size_t i = 0; i < col_count; groups += col_widths[i++] * group_count) {
162  switch (col_widths[i]) {
163  case 4:
164  *reinterpret_cast<uint32_t*>(groups) =
165  *reinterpret_cast<const uint32_t*>(init_vals + i);
166  break;
167  case 8:
168  reinterpret_cast<size_t*>(groups)[thread_index] = init_vals[i];
169  break;
170  default:;
171  }
172  }
173 }
174 
175 template <>
176 __global__ void init_group<false>(int8_t* groups,
177  const size_t group_count,
178  const size_t col_count,
179  const size_t* col_widths,
180  const size_t* init_vals) {
181  const auto thread_index =
182  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
183  if (thread_index >= group_count) {
184  return;
185  }
186  size_t row_size = 0;
187  for (size_t i = 0; i < col_count; ++i) {
188  row_size += col_widths[i];
189  }
190  int8_t* group_base = groups + row_size * thread_index;
191  for (size_t i = 0; i < col_count; group_base += col_widths[i++]) {
192  switch (col_widths[i]) {
193  case 4:
194  *reinterpret_cast<uint32_t*>(group_base) =
195  *reinterpret_cast<const uint32_t*>(init_vals + i);
196  break;
197  case 8:
198  *reinterpret_cast<size_t*>(group_base) = init_vals[i];
199  break;
200  default:;
201  }
202  }
203 }
204 
205 } // namespace
206 
207 void init_groups_on_device(int8_t* groups,
208  const size_t group_count,
209  const size_t col_count,
210  const std::vector<size_t>& col_widths,
211  const std::vector<size_t>& init_vals,
212  const bool is_columnar) {
213  thrust::device_vector<size_t> dev_col_widths(col_widths);
214  thrust::device_vector<size_t> dev_init_vals(init_vals);
215  if (is_columnar) {
216  init_group<true><<<compute_grid_dim(group_count), c_block_size>>>(
217  groups,
218  group_count,
219  col_count,
220  thrust::raw_pointer_cast(dev_col_widths.data()),
221  thrust::raw_pointer_cast(dev_init_vals.data()));
222  } else {
223  init_group<false><<<compute_grid_dim(group_count), c_block_size>>>(
224  groups,
225  group_count,
226  col_count,
227  thrust::raw_pointer_cast(dev_col_widths.data()),
228  thrust::raw_pointer_cast(dev_init_vals.data()));
229  }
230 }
231 
232 #ifdef TRY_COLUMNAR
233 namespace {
234 __global__ void columnarize_groups(int8_t* columnar_buffer,
235  const int8_t* rowwise_buffer,
236  const size_t row_count,
237  const size_t col_count,
238  const size_t* col_widths,
239  const size_t row_size) {
240  const auto thread_index =
241  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
242  if (thread_index >= row_count) {
243  return;
244  }
245  auto read_ptr = rowwise_buffer + thread_index * row_size;
246  auto col_base = columnar_buffer;
247  for (size_t i = 0; i < col_count; ++i) {
248  switch (col_widths[i]) {
249  case 8: {
250  int64_t* write_ptr = reinterpret_cast<int64_t*>(col_base) + thread_index;
251  *write_ptr = *reinterpret_cast<const int64_t*>(read_ptr);
252  break;
253  }
254  case 4: {
255  int32_t* write_ptr = reinterpret_cast<int32_t*>(col_base) + thread_index;
256  *write_ptr = *reinterpret_cast<const int32_t*>(read_ptr);
257  break;
258  }
259  default:;
260  }
261  col_base += col_widths[i] * row_count;
262  read_ptr += col_widths[i]; // WARN(miyu): No padding!!
263  }
264 }
265 } // namespace
266 
267 void columnarize_groups_on_device(int8_t* columnar_buffer,
268  const int8_t* rowwise_buffer,
269  const size_t row_count,
270  const std::vector<size_t>& col_widths) {
271  size_t row_size = 0;
272  for (size_t i = 0; i < col_widths.size(); ++i) {
273  row_size += col_widths[i];
274  }
275  thrust::device_vector<size_t> dev_col_widths(col_widths);
276  columnarize_groups<<<compute_grid_dim(row_count), c_block_size>>>(
277  columnar_buffer,
278  rowwise_buffer,
279  row_count,
280  col_widths.size(),
281  thrust::raw_pointer_cast(dev_col_widths.data()),
282  row_size);
283 }
284 #endif
285 
286 namespace {
287 
288 __device__ inline void row_func(int8_t* write_base,
289  const size_t write_stride,
290  const int8_t* read_base,
291  const size_t read_stride,
292  const size_t val_count,
293  const size_t* val_widths,
294  const OP_KIND* agg_ops) {
295  for (size_t i = 0; i < val_count; ++i) {
296  switch (val_widths[i]) {
297  case 4: {
298  auto write_ptr = reinterpret_cast<int32_t*>(write_base);
299  const auto value = *reinterpret_cast<const int32_t*>(read_base);
300  switch (agg_ops[i]) {
301  case OP_COUNT:
302  agg_count_int32_shared(reinterpret_cast<uint32_t*>(write_ptr), value);
303  break;
304  case OP_SUM:
305  agg_sum_int32_shared(write_ptr, value);
306  break;
307  case OP_MIN:
308  agg_min_int32_shared(write_ptr, value);
309  break;
310  case OP_MAX:
311  agg_max_int32_shared(write_ptr, value);
312  break;
313  default:;
314  }
315  break;
316  }
317  case 8: {
318  auto write_ptr = reinterpret_cast<int64_t*>(write_base);
319  const auto value = *reinterpret_cast<const int64_t*>(read_base);
320  switch (agg_ops[i]) {
321  case OP_COUNT:
322  agg_count_shared(reinterpret_cast<uint64_t*>(write_ptr), value);
323  break;
324  case OP_SUM:
325  agg_sum_shared(write_ptr, value);
326  break;
327  case OP_MIN:
328  agg_min_shared(write_ptr, value);
329  break;
330  case OP_MAX:
331  agg_max_shared(write_ptr, value);
332  break;
333  default:;
334  }
335  break;
336  }
337  default:;
338  }
339  write_base += val_widths[i] * write_stride;
340  read_base += val_widths[i] * read_stride;
341  }
342 }
343 
344 __device__ inline uint64_t rotl(uint64_t x, int8_t r) {
345  return (x << r) | (x >> (64 - r));
346 }
347 
348 __device__ inline uint64_t fmix(uint64_t k) {
349  k ^= k >> 33;
350  k *= 0xff51afd7ed558ccdULL;
351  k ^= k >> 33;
352  k *= 0xc4ceb9fe1a85ec53ULL;
353  k ^= k >> 33;
354 
355  return k;
356 }
357 
358 __device__ inline uint64_t murmur_hash3(const int64_t* key,
359  const size_t key_count,
360  const size_t qw_stride = 1) {
361  if (key_count == 1) {
362  return key[0];
363  }
364  uint64_t h = 0;
365  const uint64_t c1 = 0x87c37b91114253d5ULL;
366  const uint64_t c2 = 0x4cf5ad432745937fULL;
367 
368  for (int i = 0; i < key_count; i++) {
369  uint64_t k = static_cast<uint64_t>(key[i * qw_stride]);
370 
371  k *= c1;
372  k = rotl(k, 31);
373  k *= c2;
374  h ^= k;
375  h = rotl(h, 27);
376  h = h * 5 + 0x52dce729;
377  }
378 
379  h ^= key_count * sizeof(int64_t);
380  h = fmix(h);
381 
382  return h;
383 }
384 
385 __device__ inline uint64_t key_hash_strided(const int64_t* key,
386  const size_t key_count,
387  const size_t qw_stride = 1) {
388  return murmur_hash3(key, key_count, qw_stride);
389 }
390 
391 __device__ int64_t* get_matching_group_value_strided(int64_t* groups_buffer,
392  const size_t groups_count,
393  const uint64_t h,
394  const int64_t* key,
395  const size_t key_count,
396  const size_t key_qw_stride = 1) {
397  const auto off = h;
398  const auto gb_qw_stride = groups_count;
399  {
400  const uint64_t old = atomicCAS(
401  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, *key);
402  if (EMPTY_KEY_64 == old) {
403  for (size_t i = 1; i < key_count; ++i) {
404  atomicExch(
405  reinterpret_cast<unsigned long long*>(groups_buffer + i * gb_qw_stride + off),
406  key[i * key_qw_stride]);
407  }
408  }
409  }
410  if (key_count > 1) {
411  while (atomicAdd(reinterpret_cast<unsigned long long*>(
412  groups_buffer + (key_count - 1) * gb_qw_stride + off),
413  0) == EMPTY_KEY_64) {
414  // spin until the winning thread has finished writing the entire key and the init
415  // value
416  }
417  }
418  bool match = true;
419  for (uint32_t i = 0; i < key_count; ++i) {
420  if (groups_buffer[off + i * gb_qw_stride] != key[i * key_qw_stride]) {
421  match = false;
422  break;
423  }
424  }
425  return match ? groups_buffer + key_count * gb_qw_stride + off : NULL;
426 }
427 
428 __device__ int64_t* get_group_value_columnar(int64_t* groups_buffer,
429  const size_t groups_count,
430  const int64_t* key,
431  const size_t key_count,
432  const size_t key_qw_stride) {
433  const auto h = key_hash_strided(key, key_count, key_qw_stride) % groups_count;
434  auto matching_group = get_matching_group_value_strided(
435  groups_buffer, groups_count, h, key, key_count, key_qw_stride);
436  if (matching_group) {
437  return matching_group;
438  }
439  auto h_probe = (h + 1) % groups_count;
440  while (h_probe != h) {
441  matching_group = get_matching_group_value_strided(
442  groups_buffer, groups_count, h_probe, key, key_count, key_qw_stride);
443  if (matching_group) {
444  return matching_group;
445  }
446  h_probe = (h_probe + 1) % groups_count;
447  }
448  return NULL;
449 }
450 
451 template <typename KeyT = int64_t, typename ValT = int64_t>
452 __global__ void column_runner(int8_t* groups_buffer,
453  const size_t group_count,
454  const int8_t* row_buffer,
455  const size_t row_size,
456  const size_t row_count,
457  const size_t key_count,
458  const size_t val_count,
459  const size_t* val_widths,
460  const OP_KIND* agg_ops) {
461  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
462  thrust::detail::is_same<ValT, int64_t>::value,
463  "Unsupported template parameter other than int64_t for now");
464  const auto thread_index =
465  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
466  if (thread_index >= row_count) {
467  return;
468  }
469  auto keys_base = row_buffer + sizeof(KeyT) * thread_index;
470  auto read_base =
471  row_buffer + sizeof(KeyT) * row_count * key_count + sizeof(ValT) * thread_index;
472  auto write_base = reinterpret_cast<int8_t*>(
473  get_group_value_columnar(reinterpret_cast<int64_t*>(groups_buffer),
474  group_count,
475  reinterpret_cast<const int64_t*>(keys_base),
476  key_count,
477  row_count));
478  if (write_base) {
479  row_func(
480  write_base, group_count, read_base, row_count, val_count, val_widths, agg_ops);
481  }
482 }
483 
484 template <typename KeyT = int64_t, typename ValT = int64_t>
485 __global__ void row_runner(int8_t* groups_buffer,
486  const size_t group_count,
487  const int8_t* row_buffer,
488  const size_t row_size,
489  const size_t row_count,
490  const size_t key_count,
491  const size_t val_count,
492  const size_t* val_widths,
493  const OP_KIND* agg_ops) {
494  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
495  thrust::detail::is_same<ValT, int64_t>::value,
496  "Unsupported template parameter other than int64_t for now");
497  const auto thread_index =
498  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
499  if (thread_index >= row_count) {
500  return;
501  }
502  auto keys_base = row_buffer + row_size * thread_index;
503  auto read_base = keys_base + sizeof(KeyT) * key_count;
504  auto write_base = reinterpret_cast<int8_t*>(
505  get_group_value(reinterpret_cast<int64_t*>(groups_buffer),
506  static_cast<uint32_t>(group_count),
507  reinterpret_cast<const int64_t*>(keys_base),
508  static_cast<uint32_t>(key_count),
509  static_cast<uint32_t>(row_size / sizeof(int64_t)),
510  NULL));
511  if (write_base) {
512  row_func(write_base, 1, read_base, 1, val_count, val_widths, agg_ops);
513  }
514 }
515 
516 } // namespace
517 
518 void run_query_on_device(int8_t* groups_buffer,
519  const size_t group_count,
520  const int8_t* row_buffer,
521  const size_t row_count,
522  const size_t key_count,
523  const size_t val_count,
524  const std::vector<size_t>& col_widths,
525  const std::vector<OP_KIND>& agg_ops,
526  const bool is_columnar) {
527  CHECK_EQ(val_count, agg_ops.size());
528  CHECK_EQ(key_count + val_count, col_widths.size());
529  size_t row_size = 0;
530  for (size_t i = 0; i < col_widths.size(); ++i) {
531  row_size += col_widths[i];
532  }
533  thrust::device_vector<size_t> dev_col_widths(col_widths);
534  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
535  if (is_columnar) {
536  column_runner<<<compute_grid_dim(row_count), c_block_size>>>(
537  groups_buffer,
538  group_count,
539  row_buffer,
540  row_size,
541  row_count,
542  key_count,
543  val_count,
544  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
545  thrust::raw_pointer_cast(dev_agg_ops.data()));
546  } else {
547  row_runner<<<compute_grid_dim(row_count), c_block_size>>>(
548  groups_buffer,
549  group_count,
550  row_buffer,
551  row_size,
552  row_count,
553  key_count,
554  val_count,
555  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
556  thrust::raw_pointer_cast(dev_agg_ops.data()));
557  }
558 }
559 
560 #if defined(TRY_MASH) || defined(TRY_MASH_COLUMNAR)
561 namespace {
562 
563 __device__ int64_t* mash_get_matching_group_value(int64_t* groups_buffer,
564  const uint32_t h,
565  const int64_t* key,
566  const uint32_t key_qw_count,
567  const uint32_t entry_size_quad,
568  const int64_t* init_vals) {
569 #ifdef SAVE_MASH_BUF
570  const uint32_t keys_size_quad = 1;
571 #else
572  const auto keys_size_quad = key_qw_count;
573 #endif
574  const uint32_t off = h * entry_size_quad;
575  const uint64_t value = key_qw_count == 1 ? key[0] : reinterpret_cast<uint64_t>(key);
576  const uint64_t old = atomicCAS(
577  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, value);
578  if (EMPTY_KEY_64 == old) {
579  return groups_buffer + off + keys_size_quad;
580  }
581  if (key_qw_count == 1) {
582  return groups_buffer[off] == static_cast<int64_t>(value)
583  ? groups_buffer + off + keys_size_quad
584  : NULL;
585  }
586  bool match = true;
587  const auto curr_key = reinterpret_cast<int64_t*>(groups_buffer[off]);
588  for (uint32_t i = 0; i < key_qw_count; ++i) {
589  if (curr_key[i] != key[i]) {
590  match = false;
591  break;
592  }
593  }
594  return match ? groups_buffer + off + keys_size_quad : NULL;
595 }
596 
597 __device__ int64_t* mash_get_group_value(int64_t* groups_buffer,
598  const uint32_t groups_buffer_entry_count,
599  const int64_t* key,
600  const uint32_t key_qw_count,
601  const uint32_t entry_size_quad,
602  const int64_t* init_vals) {
603  const uint32_t h = key_hash(key, key_qw_count) % groups_buffer_entry_count;
604  int64_t* matching_group = mash_get_matching_group_value(
605  groups_buffer, h, key, key_qw_count, entry_size_quad, init_vals);
606  if (matching_group) {
607  return matching_group;
608  }
609  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
610  while (h_probe != h) {
611  matching_group = mash_get_matching_group_value(
612  groups_buffer, h_probe, key, key_qw_count, entry_size_quad, init_vals);
613  if (matching_group) {
614  return matching_group;
615  }
616  h_probe = (h_probe + 1) % groups_buffer_entry_count;
617  }
618  return NULL;
619 }
620 
621 __device__ int64_t* mash_get_matching_group_value_strided(
622  int64_t* groups_buffer,
623  const size_t groups_count,
624  const uint64_t h,
625  const int64_t* key,
626  const size_t key_count,
627  const size_t key_qw_stride = 1) {
628 #ifdef SAVE_MASH_BUF
629  const uint32_t actual_key_count = 1;
630 #else
631  const auto actual_key_count = key_count;
632 #endif
633  const auto off = h;
634  const auto gb_qw_stride = groups_count;
635  const uint64_t value = key_count == 1 ? key[0] : reinterpret_cast<uint64_t>(key);
636  const uint64_t old = atomicCAS(
637  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, value);
638  if (EMPTY_KEY_64 == old) {
639  return groups_buffer + actual_key_count * gb_qw_stride + off;
640  }
641  if (key_count == 1) {
642  return groups_buffer[off] == static_cast<int64_t>(value)
643  ? groups_buffer + actual_key_count * gb_qw_stride + off
644  : NULL;
645  }
646  bool match = true;
647  const auto curr_key = reinterpret_cast<int64_t*>(groups_buffer[off]);
648  for (uint32_t i = 0; i < key_count; ++i) {
649  if (curr_key[i * key_qw_stride] != key[i * key_qw_stride]) {
650  match = false;
651  break;
652  }
653  }
654  return match ? groups_buffer + actual_key_count * gb_qw_stride + off : NULL;
655 }
656 
657 __device__ int64_t* mash_get_group_value_columnar(int64_t* groups_buffer,
658  const size_t groups_count,
659  const int64_t* key,
660  const size_t key_count,
661  const size_t key_qw_stride) {
662  const auto h = key_hash_strided(key, key_count, key_qw_stride) % groups_count;
663  int64_t* matching_group = mash_get_matching_group_value_strided(
664  groups_buffer, groups_count, h, key, key_count, key_qw_stride);
665  if (matching_group) {
666  return matching_group;
667  }
668  auto h_probe = (h + 1) % groups_count;
669  while (h_probe != h) {
670  matching_group = mash_get_matching_group_value_strided(
671  groups_buffer, groups_count, h_probe, key, key_count, key_qw_stride);
672  if (matching_group) {
673  return matching_group;
674  }
675  h_probe = (h_probe + 1) % groups_count;
676  }
677  return NULL;
678 }
679 
680 template <typename KeyT = int64_t, typename ValT = int64_t>
681 __global__ void mash_column_runner(int8_t* groups_buffer,
682  const size_t group_count,
683  const int8_t* row_buffer,
684  const size_t row_size,
685  const size_t row_count,
686  const size_t key_count,
687  const size_t val_count,
688  const size_t* val_widths,
689  const OP_KIND* agg_ops) {
690  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
691  thrust::detail::is_same<ValT, int64_t>::value,
692  "Unsupported template parameter other than int64_t for now");
693  const auto thread_index =
694  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
695  if (thread_index >= row_count) {
696  return;
697  }
698  auto keys_base = row_buffer + sizeof(KeyT) * thread_index;
699  auto read_base =
700  row_buffer + sizeof(KeyT) * row_count * key_count + sizeof(ValT) * thread_index;
701  auto write_base = reinterpret_cast<int8_t*>(
702  mash_get_group_value_columnar(reinterpret_cast<int64_t*>(groups_buffer),
703  group_count,
704  reinterpret_cast<const int64_t*>(keys_base),
705  key_count,
706  row_count));
707  if (write_base) {
708  row_func(
709  write_base, group_count, read_base, row_count, val_count, val_widths, agg_ops);
710  }
711 }
712 
713 template <typename KeyT = int64_t, typename ValT = int64_t>
714 __global__ void mash_row_runner(int8_t* groups_buffer,
715  const size_t group_count,
716  const size_t entry_size,
717  const int8_t* row_buffer,
718  const size_t row_size,
719  const size_t row_count,
720  const size_t key_count,
721  const size_t val_count,
722  const size_t* val_widths,
723  const OP_KIND* agg_ops) {
724  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
725  thrust::detail::is_same<ValT, int64_t>::value,
726  "Unsupported template parameter other than int64_t for now");
727  const auto thread_index =
728  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
729  if (thread_index >= row_count) {
730  return;
731  }
732  auto keys_base = row_buffer + row_size * thread_index;
733  auto write_base = reinterpret_cast<int8_t*>(
734  mash_get_group_value(reinterpret_cast<int64_t*>(groups_buffer),
735  static_cast<uint32_t>(group_count),
736  reinterpret_cast<const int64_t*>(keys_base),
737  static_cast<uint32_t>(key_count),
738  static_cast<uint32_t>(entry_size / sizeof(int64_t)),
739  NULL));
740  if (write_base) {
741  auto read_base = keys_base + sizeof(KeyT) * key_count;
742  row_func(write_base, 1, read_base, 1, val_count, val_widths, agg_ops);
743  }
744 }
745 
746 template <typename T = int64_t, bool isColumnar = false>
747 struct PtrRestorer {
748  static_assert(thrust::detail::is_same<T, int64_t>::value,
749  "Unsupported template parameter other than int64_t for now");
750  PtrRestorer(int8_t* buff,
751  const size_t key_num,
752  const size_t entry_num,
753  const size_t row_size,
754  const size_t row_num)
755  : buff_ptr(buff)
756  , key_count(key_num)
757  , entry_count(entry_num)
758  , entry_size(row_size)
759  , row_count(row_num) {}
760  __device__ void operator()(const int index) {
761  const auto value =
762  *reinterpret_cast<T*>(buff_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
763  if (is_empty_slot(value)) {
764  return;
765  }
766  auto dst =
767  reinterpret_cast<T*>(buff_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
768  const size_t key_stride = isColumnar ? row_count : 1;
769  const size_t dst_stride = isColumnar ? entry_count : 1;
770 
771  auto keys_ptr = reinterpret_cast<T*>(value);
772  for (size_t i = 0; i < key_count; ++i) {
773  dst[i * dst_stride] = keys_ptr[i * key_stride];
774  }
775  }
776  int8_t* buff_ptr;
777  const size_t key_count;
778  const size_t entry_count;
779  const size_t entry_size;
780  const size_t row_count;
781 };
782 
783 #ifdef SAVE_MASH_BUF
784 template <typename T = int64_t, bool isColumnar = false>
785 struct IdxRestorer {
786  static_assert(thrust::detail::is_same<T, int64_t>::value,
787  "Unsupported template parameter other than int64_t for now");
788  IdxRestorer(int8_t* output,
789  const size_t entry_sz,
790  const int8_t* input,
791  const size_t row_sz,
792  const size_t row_num)
793  : output_ptr(output)
794  , entry_size(entry_sz)
795  , input_ptr(input)
796  , row_size(row_sz)
797  , row_count(row_num) {}
798  __device__ void operator()(const int index) {
799  const auto value =
800  *reinterpret_cast<T*>(output_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
801  if (is_empty_slot(value)) {
802  return;
803  }
804  auto dst_ptr =
805  reinterpret_cast<T*>(output_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
806  auto row_ptr = reinterpret_cast<int8_t*>(value);
807  *dst_ptr = (row_ptr - input_ptr) / (isColumnar ? sizeof(T) : row_size);
808  }
809  int8_t* output_ptr;
810  const size_t entry_size;
811  const int8_t* input_ptr;
812  const size_t row_size;
813  const size_t row_count;
814 };
815 #endif
816 
817 template <bool isColumnar = false>
818 void mash_restore_keys(int8_t* groups_buffer,
819  const size_t group_count,
820  const size_t entry_size,
821  const int8_t* input_buffer,
822  const size_t row_count,
823  const size_t key_count,
824  const size_t row_size) {
825 #ifdef SAVE_MASH_BUF
826  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
827  thrust::make_counting_iterator(group_count),
828  IdxRestorer<int64_t, isColumnar>(
829  groups_buffer, entry_size, input_buffer, row_size, row_count));
830 #else
831  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
832  thrust::make_counting_iterator(group_count),
833  PtrRestorer<int64_t, isColumnar>(
834  groups_buffer, key_count, group_count, row_size, row_count));
835 #endif
836 }
837 
838 } // namespace
839 
840 void mash_run_query_on_device(int8_t* groups_buffer,
841  const size_t group_count,
842  const int8_t* row_buffer,
843  const size_t row_count,
844  const size_t key_count,
845  const size_t val_count,
846  const std::vector<size_t>& col_widths,
847  const std::vector<OP_KIND>& agg_ops,
848  const bool is_columnar) {
849  CHECK_EQ(val_count, agg_ops.size());
850  CHECK_EQ(key_count + val_count, col_widths.size());
851  size_t row_size = 0;
852  for (size_t i = 0; i < col_widths.size(); ++i) {
853  row_size += col_widths[i];
854  }
855 #ifdef SAVE_MASH_BUF
856  size_t entry_size = sizeof(int64_t);
857  for (size_t i = key_count; i < col_widths.size(); ++i) {
858  entry_size += col_widths[i];
859  }
860 #else
861  const auto entry_size = row_size;
862 #endif
863  thrust::device_vector<size_t> dev_col_widths(col_widths);
864  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
865  if (is_columnar) {
866  mash_column_runner<<<compute_grid_dim(row_count), c_block_size>>>(
867  groups_buffer,
868  group_count,
869  row_buffer,
870  row_size,
871  row_count,
872  key_count,
873  val_count,
874  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
875  thrust::raw_pointer_cast(dev_agg_ops.data()));
876  if (key_count > 1) {
877  mash_restore_keys<true>(groups_buffer,
878  group_count,
879  entry_size,
880  row_buffer,
881  row_count,
882  key_count,
883  row_size);
884  }
885  } else {
886  mash_row_runner<<<compute_grid_dim(row_count), c_block_size>>>(
887  groups_buffer,
888  group_count,
889  entry_size,
890  row_buffer,
891  row_size,
892  row_count,
893  key_count,
894  val_count,
895  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
896  thrust::raw_pointer_cast(dev_agg_ops.data()));
897  if (key_count > 1) {
898  mash_restore_keys<false>(groups_buffer,
899  group_count,
900  entry_size,
901  row_buffer,
902  row_count,
903  key_count,
904  row_size);
905  }
906  }
907 }
908 #endif // TRY_MASH || TRY_MASH_COLUMNAR
909 
910 namespace {
911 
912 __device__ int32_t* get_matching_bucket(int32_t* hash_table,
913  const uint32_t h,
914  const int64_t* key,
915  const int8_t* row_buffer,
916  const size_t key_count,
917  const size_t row_size,
918  bool* is_owner) {
919  const auto value =
920  static_cast<int>(reinterpret_cast<const int8_t*>(key) - row_buffer) / row_size;
921  const auto old =
922  atomicCAS(reinterpret_cast<int*>(hash_table + h), int(EMPTY_KEY_32), value);
923  if (EMPTY_KEY_32 == old) {
924  *is_owner = true;
925  return hash_table + h;
926  }
927  bool match = true;
928  const auto curr_key =
929  reinterpret_cast<const int64_t*>(row_buffer + hash_table[h] * row_size);
930  for (uint32_t i = 0; i < key_count; ++i) {
931  if (curr_key[i] != key[i]) {
932  match = false;
933  break;
934  }
935  }
936  if (match) {
937  *is_owner = false;
938  }
939  return match ? hash_table + h : NULL;
940 }
941 
942 __device__ bool acquire_bucket(int32_t* hash_table,
943  const uint32_t bucket_count,
944  const int64_t* key,
945  const int8_t* row_buffer,
946  const size_t key_count,
947  const size_t row_size) {
948  const auto h = key_hash(key, key_count) % bucket_count;
949  bool is_owner = false;
950  auto matching_bucket =
951  get_matching_bucket(hash_table, h, key, row_buffer, key_count, row_size, &is_owner);
952  if (matching_bucket) {
953  return is_owner;
954  }
955  uint32_t h_probe = (h + 1) % bucket_count;
956  while (h_probe != h) {
957  matching_bucket = get_matching_bucket(
958  hash_table, h_probe, key, row_buffer, key_count, row_size, &is_owner);
959  if (matching_bucket) {
960  return is_owner;
961  }
962  h_probe = (h_probe + 1) % bucket_count;
963  }
964  return false;
965 }
966 
967 template <typename KeyT = int64_t>
968 __global__ void column_deduplicater(int8_t* row_buffer,
969  const size_t row_count,
970  const size_t row_size,
971  const size_t key_count,
972  int32_t* hash_table,
973  const size_t bucket_count) {
974  static_assert(thrust::detail::is_same<KeyT, int64_t>::value,
975  "Unsupported template parameter other than int64_t for now");
976 }
977 
978 template <typename KeyT = int64_t>
979 __global__ void row_deduplicater(int8_t* row_buffer,
980  const size_t row_count,
981  const size_t row_size,
982  const size_t key_count,
983  int32_t* hash_table,
984  const size_t bucket_count) {
985  static_assert(thrust::detail::is_same<KeyT, int64_t>::value,
986  "Unsupported template parameter other than int64_t for now");
987  const auto thread_index =
988  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
989 
990  if (thread_index >= row_count) {
991  return;
992  }
993 
994  auto keys_base = row_buffer + row_size * thread_index;
995  auto keys_i64 = reinterpret_cast<KeyT*>(keys_base);
996  bool is_owner = acquire_bucket(hash_table,
997  static_cast<uint32_t>(bucket_count),
998  keys_i64,
999  row_buffer,
1000  key_count,
1001  row_size);
1002  if (!is_owner) {
1003  reset_entry(keys_i64);
1004  }
1005 }
1006 
1007 template <bool isColumnar = false, typename T = int64_t>
1008 struct RowCounter {
1009  static_assert(thrust::detail::is_same<T, int64_t>::value,
1010  "Unsupported template parameter other than int64_t for now");
1011  RowCounter(uint32_t* row_num, const int8_t* buff, const size_t entry_sz)
1012  : row_count(row_num), buff_ptr(buff), entry_size(entry_sz) {}
1013  __device__ void operator()(const int index) {
1014  const auto value = *reinterpret_cast<const T*>(
1015  buff_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
1016  if (is_empty_slot(value)) {
1017  return;
1018  }
1019  atomicAdd(row_count, 1UL);
1020  }
1021 
1022  uint32_t* row_count;
1023  const int8_t* buff_ptr;
1024  const size_t entry_size;
1025 };
1026 
1027 template <bool isColumnar = false>
1028 inline size_t count_rows(const int8_t* input_buffer,
1029  const size_t entry_count,
1030  const size_t row_size) {
1031  thrust::device_vector<uint32_t> row_count(1, 0);
1032  thrust::for_each(
1033  thrust::make_counting_iterator(size_t(0)),
1034  thrust::make_counting_iterator(entry_count),
1035  RowCounter<isColumnar>(
1036  thrust::raw_pointer_cast(row_count.data()), input_buffer, row_size));
1037  return static_cast<size_t>(row_count[0]);
1038 }
1039 
1040 template <bool isColumnar = false, typename T = int64_t>
1041 struct InplaceCompactor {
1042  static_assert(thrust::detail::is_same<T, int64_t>::value,
1043  "Unsupported template parameter other than int64_t for now");
1044  InplaceCompactor(uint32_t* w,
1045  const uint32_t e,
1046  int8_t* b,
1047  const size_t e_cnt,
1048  const size_t c_cnt,
1049  const size_t* c_wids,
1050  const size_t e_sz)
1051  : walker(w)
1052  , end(e)
1053  , buff_ptr(b)
1054  , entry_count(e_cnt)
1055  , col_count(c_cnt)
1056  , col_widths(c_wids)
1057  , entry_size(e_sz) {}
1058  __device__ void operator()(const int index) {
1059  const auto stride = isColumnar ? sizeof(T) : entry_size;
1060  const auto value = *reinterpret_cast<T*>(buff_ptr + stride * index);
1061  if (!is_empty_slot(static_cast<T>(value))) {
1062  return;
1063  }
1064  bool found = false;
1065  T* key_ptr = nullptr;
1066  for (auto curr_row = atomicSub(walker, 1UL); curr_row > end;
1067  curr_row = atomicSub(walker, 1UL)) {
1068  key_ptr = reinterpret_cast<T*>(buff_ptr + stride * curr_row);
1069  if (!is_empty_slot(*key_ptr)) {
1070  found = true;
1071  break;
1072  }
1073  }
1074  if (found) {
1075  auto dst_ptr = buff_ptr + stride * index;
1076  if (isColumnar) {
1077  auto src_ptr = reinterpret_cast<int8_t*>(key_ptr);
1078  for (size_t i = 0; i < col_count; ++i) {
1079  switch (col_widths[i]) {
1080  case 4:
1081  *reinterpret_cast<int32_t*>(dst_ptr) = *reinterpret_cast<int32_t*>(src_ptr);
1082  break;
1083  case 8:
1084  *reinterpret_cast<int64_t*>(dst_ptr) = *reinterpret_cast<int64_t*>(src_ptr);
1085  break;
1086  default:;
1087  }
1088  dst_ptr += col_widths[i] * entry_count;
1089  src_ptr += col_widths[i] * entry_count;
1090  }
1091  } else {
1092  memcpy(dst_ptr, key_ptr, entry_size);
1093  }
1094  reset_entry(key_ptr);
1095  }
1096  }
1097 
1098  uint32_t* walker;
1099  uint32_t end;
1100  int8_t* buff_ptr;
1101  const size_t entry_count;
1102  const size_t col_count;
1103  const size_t* col_widths;
1104  const size_t entry_size;
1105 };
1106 
1107 template <bool isColumnar = false>
1108 inline size_t compact_buffer(int8_t* input_buffer,
1109  const size_t entry_count,
1110  const std::vector<size_t>& col_widths) {
1111  const auto col_count = col_widths.size();
1112  size_t entry_size = 0;
1113  for (size_t i = 0; i < col_count; ++i) {
1114  entry_size += col_widths[i];
1115  }
1116  const auto actual_row_count =
1117  count_rows<isColumnar>(input_buffer, entry_count, entry_size);
1118  if (actual_row_count > static_cast<size_t>(entry_count * 0.4f)) {
1119  return entry_count;
1120  }
1121  thrust::device_vector<size_t> dev_col_widths(col_widths);
1122  thrust::device_vector<uint32_t> walker(1, entry_count - 1);
1123  thrust::for_each(
1124  thrust::make_counting_iterator(size_t(0)),
1125  thrust::make_counting_iterator(actual_row_count),
1126  InplaceCompactor<isColumnar>(thrust::raw_pointer_cast(walker.data()),
1127  static_cast<uint32_t>(actual_row_count - 1),
1128  input_buffer,
1129  entry_count,
1130  col_count,
1131  thrust::raw_pointer_cast(dev_col_widths.data()),
1132  entry_size));
1133  return actual_row_count;
1134 }
1135 
1136 template <bool isColumnar = false, typename T = int64_t>
1137 struct Checker {
1138  static_assert(thrust::detail::is_same<T, int64_t>::value,
1139  "Unsupported template parameter other than int64_t for now");
1140  Checker(uint32_t* cnt, const int8_t* buff, const size_t entry_sz)
1141  : count(cnt), buff_ptr(buff), entry_size(entry_sz) {}
1142  __device__ void operator()(const int index) {
1143  const auto value = *reinterpret_cast<const T*>(
1144  buff_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
1145  const auto next_value = *reinterpret_cast<const T*>(
1146  buff_ptr + (isColumnar ? sizeof(T) : entry_size) * (index + 1));
1147  if ((!is_empty_slot(value) && is_empty_slot(next_value)) ||
1148  (is_empty_slot(value) && !is_empty_slot(next_value))) {
1149  atomicAdd(count, 1UL);
1150  }
1151  }
1152 
1153  uint32_t* count;
1154  const int8_t* buff_ptr;
1155  const size_t entry_size;
1156 };
1157 
1158 template <bool isColumnar = false>
1159 inline bool is_compacted(const int8_t* input_buffer,
1160  const size_t entry_count,
1161  const size_t row_size) {
1162  thrust::device_vector<uint32_t> count(1, 0);
1163  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
1164  thrust::make_counting_iterator(entry_count - 1),
1165  Checker<isColumnar>(
1166  thrust::raw_pointer_cast(count.data()), input_buffer, row_size));
1167  return count[0] == 1;
1168 }
1169 
1170 } // namespace
1171 
1172 size_t deduplicate_rows_on_device(int8_t* row_buffer,
1173  const size_t row_count,
1174  const size_t key_count,
1175  const std::vector<size_t>& col_widths,
1176  const bool is_columnar) {
1177  CHECK_GT(col_widths.size(), key_count);
1178  size_t row_size = 0;
1179  for (auto wid : col_widths) {
1180  row_size += wid;
1181  }
1182  const auto bucket_count = static_cast<size_t>(row_count * 1.3f);
1183  thrust::device_vector<int32_t> hash_table(bucket_count, int32_t(EMPTY_KEY_32));
1184  if (is_columnar) {
1185  column_deduplicater<<<compute_grid_dim(row_count), c_block_size>>>(
1186  row_buffer,
1187  row_count,
1188  row_size,
1189  key_count,
1190  thrust::raw_pointer_cast(hash_table.data()),
1191  bucket_count);
1192  } else {
1193  row_deduplicater<<<compute_grid_dim(row_count), c_block_size>>>(
1194  row_buffer,
1195  row_count,
1196  row_size,
1197  key_count,
1198  thrust::raw_pointer_cast(hash_table.data()),
1199  bucket_count);
1200  }
1201 
1202  return (is_columnar ? count_rows<true>(row_buffer, row_count, row_size)
1203  : count_rows<false>(row_buffer, row_count, row_size));
1204 }
1205 
1206 namespace {
1207 template <bool isColumnar = false, typename T = int64_t>
1208 struct Dropper {
1209  static_assert(thrust::detail::is_same<T, int64_t>::value,
1210  "Unsupported template parameter other than int64_t for now");
1211  Dropper(int8_t* buff, uint32_t* ub, const size_t row_cnt, const size_t entry_sz)
1212  : buff_ptr(buff), upper_bound(ub), row_count(row_cnt), entry_size(entry_sz) {}
1213  __device__ void operator()(const int index) {
1214  auto key_ptr =
1215  reinterpret_cast<T*>(buff_ptr + (isColumnar ? sizeof(T) : entry_size) * index);
1216  if (is_empty_slot(*key_ptr)) {
1217  return;
1218  }
1219  if (atomicAdd(upper_bound, 1UL) <= row_count) {
1220  reset_entry(key_ptr);
1221  }
1222  }
1223 
1224  int8_t* buff_ptr;
1225  uint32_t* upper_bound;
1226  const uint32_t row_count;
1227  const size_t entry_size;
1228 };
1229 
1230 } // namespace
1231 
1232 size_t drop_rows(int8_t* row_buffer,
1233  const size_t entry_count,
1234  const size_t entry_size,
1235  const size_t row_count,
1236  const float fill_rate,
1237  const bool is_columnar) {
1238  auto limit = static_cast<size_t>(entry_count * fill_rate);
1239  if (row_count < limit) {
1240  return row_count;
1241  }
1242  thrust::device_vector<uint32_t> upper_bound(1, static_cast<uint32_t>(limit));
1243 
1244  if (is_columnar) {
1245  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
1246  thrust::make_counting_iterator(entry_count),
1247  Dropper<true>(row_buffer,
1248  thrust::raw_pointer_cast(upper_bound.data()),
1249  row_count,
1250  entry_size));
1251  } else {
1252  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
1253  thrust::make_counting_iterator(entry_count),
1254  Dropper<false>(row_buffer,
1255  thrust::raw_pointer_cast(upper_bound.data()),
1256  row_count,
1257  entry_size));
1258  }
1259  return limit;
1260 }
1261 
1262 namespace {
1263 
1264 __device__ inline void reduce_func(int8_t* write_base,
1265  const size_t write_stride,
1266  const int8_t* read_base,
1267  const size_t read_stride,
1268  const size_t val_count,
1269  const size_t* val_widths,
1270  const OP_KIND* agg_ops) {
1271  for (size_t i = 0; i < val_count; ++i) {
1272  switch (val_widths[i]) {
1273  case 4: {
1274  auto write_ptr = reinterpret_cast<int32_t*>(write_base);
1275  const auto value = *reinterpret_cast<const int32_t*>(read_base);
1276  switch (agg_ops[i]) {
1277  case OP_COUNT:
1278  case OP_SUM:
1279  agg_sum_int32_shared(write_ptr, value);
1280  break;
1281  case OP_MIN:
1282  agg_min_int32_shared(write_ptr, value);
1283  break;
1284  case OP_MAX:
1285  agg_max_int32_shared(write_ptr, value);
1286  break;
1287  default:;
1288  }
1289  break;
1290  }
1291  case 8: {
1292  auto write_ptr = reinterpret_cast<int64_t*>(write_base);
1293  const auto value = *reinterpret_cast<const int64_t*>(read_base);
1294  switch (agg_ops[i]) {
1295  case OP_COUNT:
1296  case OP_SUM:
1297  agg_sum_shared(write_ptr, value);
1298  break;
1299  case OP_MIN:
1300  agg_min_shared(write_ptr, value);
1301  break;
1302  case OP_MAX:
1303  agg_max_shared(write_ptr, value);
1304  break;
1305  default:;
1306  }
1307  break;
1308  }
1309  default:;
1310  }
1311  write_base += val_widths[i] * write_stride;
1312  read_base += val_widths[i] * read_stride;
1313  }
1314 }
1315 
1316 template <typename KeyT = int64_t, typename ValT = int64_t>
1317 __global__ void column_reducer(int8_t* this_buffer,
1318  const size_t this_entry_count,
1319  int8_t* that_buffer,
1320  const size_t that_entry_count,
1321  const size_t entry_size,
1322  const size_t key_count,
1323  const size_t val_count,
1324  const size_t* val_widths,
1325  const OP_KIND* agg_ops) {
1326  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1327  thrust::detail::is_same<ValT, int64_t>::value,
1328  "Unsupported template parameter other than int64_t for now");
1329 }
1330 
1331 template <typename KeyT = int64_t, typename ValT = int64_t>
1332 __global__ void row_reducer(int8_t* this_buffer,
1333  const size_t this_entry_count,
1334  int8_t* that_buffer,
1335  const size_t that_entry_count,
1336  const size_t entry_size,
1337  const size_t key_count,
1338  const size_t val_count,
1339  const size_t* val_widths,
1340  const OP_KIND* agg_ops) {
1341  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1342  thrust::detail::is_same<ValT, int64_t>::value,
1343  "Unsupported template parameter other than int64_t for now");
1344  const auto thread_index =
1345  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
1346  auto keys_base = that_buffer + entry_size * thread_index;
1347  auto keys_i64 = reinterpret_cast<KeyT*>(keys_base);
1348 
1349  if (thread_index >= that_entry_count || is_empty_slot(*keys_i64)) {
1350  return;
1351  }
1352 
1353  auto write_base = reinterpret_cast<int8_t*>(
1354  get_group_value(reinterpret_cast<int64_t*>(this_buffer),
1355  static_cast<uint32_t>(this_entry_count),
1356  keys_i64,
1357  static_cast<uint32_t>(key_count),
1358  static_cast<uint32_t>(entry_size / sizeof(int64_t)),
1359  NULL));
1360  if (write_base) {
1361  auto read_base = keys_base + sizeof(KeyT) * key_count;
1362  reduce_func(write_base, 1, read_base, 1, val_count, val_widths, agg_ops);
1363  }
1364 }
1365 
1366 } // namespace
1367 
1368 int8_t* get_hashed_copy(int8_t* dev_buffer,
1369  const size_t entry_count,
1370  const size_t new_entry_count,
1371  const std::vector<size_t>& col_widths,
1372  const std::vector<OP_KIND>& agg_ops,
1373  const std::vector<size_t>& init_vals,
1374  const bool is_columnar) {
1375  const size_t val_count = agg_ops.size();
1376  const size_t key_count = col_widths.size() - val_count;
1377  size_t entry_size = 0;
1378  for (size_t i = 0; i < col_widths.size(); ++i) {
1379  entry_size += col_widths[i];
1380  }
1381  thrust::device_vector<size_t> dev_col_widths(col_widths);
1382  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
1383  thrust::device_vector<size_t> dev_init_vals(init_vals);
1384 
1385  int8_t* dev_copy = nullptr;
1386  cudaMalloc(&dev_copy, entry_size * new_entry_count);
1387  if (is_columnar) {
1388  init_group<true><<<compute_grid_dim(new_entry_count), c_block_size>>>(
1389  dev_copy,
1390  new_entry_count,
1391  dev_col_widths.size(),
1392  thrust::raw_pointer_cast(dev_col_widths.data()),
1393  thrust::raw_pointer_cast(dev_init_vals.data()));
1394  column_reducer<<<compute_grid_dim(entry_count), c_block_size>>>(
1395  dev_copy,
1396  new_entry_count,
1397  dev_buffer,
1398  entry_count,
1399  entry_size,
1400  key_count,
1401  val_count,
1402  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1403  thrust::raw_pointer_cast(dev_agg_ops.data()));
1404  } else {
1405  init_group<false><<<compute_grid_dim(new_entry_count), c_block_size>>>(
1406  dev_copy,
1407  new_entry_count,
1408  col_widths.size(),
1409  thrust::raw_pointer_cast(dev_col_widths.data()),
1410  thrust::raw_pointer_cast(dev_init_vals.data()));
1411  row_reducer<<<compute_grid_dim(entry_count), c_block_size>>>(
1412  dev_copy,
1413  new_entry_count,
1414  dev_buffer,
1415  entry_count,
1416  entry_size,
1417  key_count,
1418  val_count,
1419  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1420  thrust::raw_pointer_cast(dev_agg_ops.data()));
1421  }
1422  return dev_copy;
1423 }
1424 
1425 void reduce_on_device(int8_t*& this_dev_buffer,
1426  const size_t this_dev_id,
1427  size_t& this_entry_count,
1428  int8_t* that_dev_buffer,
1429  const size_t that_dev_id,
1430  const size_t that_entry_count,
1431  const size_t that_actual_row_count,
1432  const std::vector<size_t>& col_widths,
1433  const std::vector<OP_KIND>& agg_ops,
1434  const std::vector<size_t>& init_vals,
1435  const bool is_columnar) {
1436  CHECK_EQ(col_widths.size(), init_vals.size());
1437  const size_t val_count = agg_ops.size();
1438  const size_t key_count = col_widths.size() - val_count;
1439  size_t entry_size = 0;
1440  for (size_t i = 0; i < col_widths.size(); ++i) {
1441  entry_size += col_widths[i];
1442  }
1443 
1444  thrust::device_vector<size_t> dev_col_widths(col_widths);
1445  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
1446  auto total_row_count =
1447  (is_columnar ? count_rows<true>(this_dev_buffer, this_entry_count, entry_size)
1448  : count_rows<false>(this_dev_buffer, this_entry_count, entry_size)) +
1449  that_actual_row_count;
1450  const auto threshold = static_cast<size_t>(total_row_count * 1.3f);
1451  if (threshold > this_entry_count) {
1452  total_row_count = std::min(threshold, this_entry_count + that_entry_count);
1453  thrust::device_vector<size_t> dev_init_vals(init_vals);
1454  auto this_dev_copy = get_hashed_copy(this_dev_buffer,
1455  this_entry_count,
1456  total_row_count,
1457  col_widths,
1458  agg_ops,
1459  init_vals,
1460  is_columnar);
1461 
1462  cudaFree(this_dev_buffer);
1463  this_dev_buffer = this_dev_copy;
1464  this_entry_count = total_row_count;
1465  }
1466  int8_t* that_dev_copy = nullptr;
1467  if (that_dev_id != this_dev_id) {
1468  cudaMalloc(&that_dev_copy, that_entry_count * entry_size);
1469  int canAccessPeer;
1470  cudaDeviceCanAccessPeer(&canAccessPeer, this_dev_id, that_dev_id);
1471  if (canAccessPeer) {
1472  cudaDeviceEnablePeerAccess(that_dev_id, 0);
1473  }
1474  cudaMemcpyPeer(that_dev_copy,
1475  this_dev_id,
1476  that_dev_buffer,
1477  that_dev_id,
1478  that_entry_count * entry_size);
1479  } else {
1480  that_dev_copy = that_dev_buffer;
1481  }
1482 
1483  if (is_columnar) {
1484  column_reducer<<<compute_grid_dim(that_entry_count), c_block_size>>>(
1485  this_dev_buffer,
1486  this_entry_count,
1487  that_dev_copy,
1488  that_entry_count,
1489  entry_size,
1490  key_count,
1491  val_count,
1492  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1493  thrust::raw_pointer_cast(dev_agg_ops.data()));
1494  this_entry_count =
1495  compact_buffer<true>(this_dev_buffer, this_entry_count, col_widths);
1496  } else {
1497  row_reducer<<<compute_grid_dim(that_entry_count), c_block_size>>>(
1498  this_dev_buffer,
1499  this_entry_count,
1500  that_dev_copy,
1501  that_entry_count,
1502  entry_size,
1503  key_count,
1504  val_count,
1505  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1506  thrust::raw_pointer_cast(dev_agg_ops.data()));
1507  this_entry_count =
1508  compact_buffer<false>(this_dev_buffer, this_entry_count, col_widths);
1509  }
1510 
1511  cudaFree(that_dev_copy);
1512 }
1513 
1514 namespace {
1515 
1516 template <typename KeyT = int64_t>
1517 __device__ size_t get_perfect_hash_index(const KeyT* key_base,
1518  const size_t key_count,
1519  const KeyT* min_keys,
1520  const KeyT* max_keys,
1521  const size_t stride) {
1522  size_t hash_index = 0;
1523  for (size_t k = 0; k < key_count; ++k) {
1524  if (k > 0) {
1525  hash_index *= static_cast<size_t>(max_keys[k] - min_keys[k] + 1);
1526  }
1527  hash_index += static_cast<size_t>(key_base[k * stride] - min_keys[k]);
1528  }
1529  return hash_index;
1530 }
1531 
1532 template <typename KeyT = int64_t, typename ValT = int64_t>
1533 __global__ void col_perfect_placer(int8_t* new_buffer,
1534  const size_t write_stride,
1535  const int8_t* old_buffer,
1536  const size_t entry_count,
1537  const size_t read_stride,
1538  const size_t key_count,
1539  const KeyT* min_keys,
1540  const KeyT* max_keys,
1541  const size_t val_count,
1542  const size_t* val_widths) {
1543  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1544  thrust::detail::is_same<ValT, int64_t>::value,
1545  "Unsupported template parameter other than int64_t for now");
1546  const auto thread_index =
1547  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
1548 
1549  auto keys_base = old_buffer + sizeof(KeyT) * thread_index;
1550  auto keys_i64 = reinterpret_cast<const KeyT*>(keys_base);
1551 
1552  if (thread_index >= entry_count || is_empty_slot(*keys_i64)) {
1553  return;
1554  }
1555 
1556  auto read_base = keys_base;
1557  auto write_base =
1558  new_buffer +
1559  sizeof(KeyT) *
1560  get_perfect_hash_index(keys_i64, key_count, min_keys, max_keys, read_stride);
1561  const auto old = atomicCAS(reinterpret_cast<unsigned long long*>(write_base),
1562  EMPTY_KEY_64,
1563  static_cast<unsigned long long>(*keys_i64));
1564 
1565  if (is_empty_slot(static_cast<KeyT>(old))) {
1566  for (size_t i = 0; i < key_count; ++i,
1567  write_base += write_stride * sizeof(KeyT),
1568  read_base += read_stride * sizeof(KeyT)) {
1569  *reinterpret_cast<KeyT*>(write_base) = *reinterpret_cast<const KeyT*>(read_base);
1570  }
1571  for (size_t i = 0; i < val_count; ++i,
1572  write_base += write_stride * sizeof(ValT),
1573  read_base += read_stride * sizeof(ValT)) {
1574  *reinterpret_cast<ValT*>(write_base) = *reinterpret_cast<const ValT*>(read_base);
1575  }
1576  }
1577 }
1578 
1579 template <typename KeyT = int64_t, typename ValT = int64_t>
1580 __global__ void row_perfect_placer(int8_t* new_buffer,
1581  const int8_t* old_buffer,
1582  const size_t entry_count,
1583  const size_t entry_size,
1584  const size_t key_count,
1585  const KeyT* min_keys,
1586  const KeyT* max_keys,
1587  const size_t val_count,
1588  const size_t* val_widths) {
1589  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1590  thrust::detail::is_same<ValT, int64_t>::value,
1591  "Unsupported template parameter other than int64_t for now");
1592  const auto thread_index =
1593  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
1594 
1595  auto keys_base = old_buffer + entry_size * thread_index;
1596  auto keys_i64 = reinterpret_cast<const KeyT*>(keys_base);
1597 
1598  if (thread_index >= entry_count || is_empty_slot(*keys_i64)) {
1599  return;
1600  }
1601 
1602  auto write_base =
1603  new_buffer +
1604  entry_size * get_perfect_hash_index(keys_i64, key_count, min_keys, max_keys, 1);
1605  const auto old = atomicCAS(reinterpret_cast<unsigned long long*>(write_base),
1606  EMPTY_KEY_64,
1607  static_cast<unsigned long long>(*keys_i64));
1608 
1609  if (is_empty_slot(static_cast<KeyT>(old))) {
1610  memcpy(write_base, keys_base, entry_size);
1611  }
1612 }
1613 
1614 } // namespace
1615 
1616 std::pair<int8_t*, size_t> get_perfect_hashed_copy(
1617  int8_t* dev_buffer,
1618  const size_t entry_count,
1619  const std::vector<size_t>& col_widths,
1620  const std::vector<std::pair<int64_t, int64_t>>& ranges,
1621  const std::vector<OP_KIND>& agg_ops,
1622  const std::vector<size_t>& init_vals,
1623  const bool is_columnar) {
1624  const size_t val_count = agg_ops.size();
1625  const size_t key_count = col_widths.size() - val_count;
1626  size_t entry_size = 0;
1627  for (size_t i = 0; i < col_widths.size(); ++i) {
1628  entry_size += col_widths[i];
1629  }
1630  thrust::device_vector<size_t> dev_col_widths(col_widths);
1631  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
1632  thrust::device_vector<size_t> dev_init_vals(init_vals);
1633  std::vector<int64_t> min_keys(key_count, 0);
1634  std::vector<int64_t> max_keys(key_count, 0);
1635  for (size_t k = 0; k < key_count; ++k) {
1636  min_keys[k] = ranges[k].first;
1637  max_keys[k] = ranges[k].second;
1638  }
1639  thrust::device_vector<int64_t> dev_min_keys(min_keys);
1640  thrust::device_vector<int64_t> dev_max_keys(max_keys);
1641  int8_t* dev_copy = nullptr;
1642  cudaMalloc(&dev_copy, entry_size * entry_count);
1643  if (is_columnar) {
1644  init_group<true><<<compute_grid_dim(entry_count), c_block_size>>>(
1645  dev_copy,
1646  entry_count,
1647  col_widths.size(),
1648  thrust::raw_pointer_cast(dev_col_widths.data()),
1649  thrust::raw_pointer_cast(dev_init_vals.data()));
1650  col_perfect_placer<<<compute_grid_dim(entry_count), c_block_size>>>(
1651  dev_copy,
1652  entry_count,
1653  dev_buffer,
1654  entry_count,
1655  entry_count,
1656  key_count,
1657  thrust::raw_pointer_cast(dev_min_keys.data()),
1658  thrust::raw_pointer_cast(dev_max_keys.data()),
1659  val_count,
1660  thrust::raw_pointer_cast(dev_col_widths.data() + key_count));
1661  } else {
1662  init_group<false><<<compute_grid_dim(entry_count), c_block_size>>>(
1663  dev_copy,
1664  entry_count,
1665  col_widths.size(),
1666  thrust::raw_pointer_cast(dev_col_widths.data()),
1667  thrust::raw_pointer_cast(dev_init_vals.data()));
1668 
1669  row_perfect_placer<<<compute_grid_dim(entry_count), c_block_size>>>(
1670  dev_copy,
1671  dev_buffer,
1672  entry_count,
1673  entry_size,
1674  key_count,
1675  thrust::raw_pointer_cast(dev_min_keys.data()),
1676  thrust::raw_pointer_cast(dev_max_keys.data()),
1677  val_count,
1678  thrust::raw_pointer_cast(dev_col_widths.data() + key_count));
1679  }
1680 
1681  const auto actual_entry_count =
1682  (is_columnar ? count_rows<true>(dev_copy, entry_count, entry_size)
1683  : count_rows<false>(dev_copy, entry_count, entry_size));
1684  return {dev_copy, actual_entry_count};
1685 }
1686 
1687 int8_t* fetch_segs_from_others(std::vector<int8_t*>& dev_reduced_buffers,
1688  const size_t entry_count,
1689  const size_t dev_id,
1690  const size_t dev_count,
1691  const std::vector<size_t>& col_widths,
1692  const bool is_columnar,
1693  const size_t start,
1694  const size_t end) {
1695  const size_t col_count = col_widths.size();
1696  size_t entry_size = 0;
1697  for (size_t i = 0; i < col_widths.size(); ++i) {
1698  entry_size += col_widths[i];
1699  }
1700  const size_t seg_entry_count = end - start;
1701  const size_t read_stride = entry_count;
1702  const size_t write_stride = seg_entry_count * (dev_count - 1);
1703  int8_t* dev_segs_copy = nullptr;
1704  cudaMalloc(&dev_segs_copy, write_stride * entry_size);
1705  for (size_t i = (dev_id + 1) % dev_count, offset = 0; i != dev_id;
1706  i = (i + 1) % dev_count) {
1707  int canAccessPeer;
1708  cudaDeviceCanAccessPeer(&canAccessPeer, dev_id, i);
1709  if (canAccessPeer) {
1710  cudaDeviceEnablePeerAccess(i, 0);
1711  }
1712  if (is_columnar) {
1713  auto read_ptr = dev_reduced_buffers[i] + start * col_widths[0];
1714  auto write_ptr = dev_segs_copy + offset * col_widths[0];
1715  for (size_t j = 0; j < col_count; ++j) {
1716  cudaMemcpyPeer(write_ptr, dev_id, read_ptr, i, seg_entry_count * col_widths[j]);
1717  read_ptr += read_stride * col_widths[j];
1718  write_ptr += write_stride * col_widths[j];
1719  }
1720  offset += seg_entry_count;
1721  } else {
1722  cudaMemcpyPeer(dev_segs_copy + offset,
1723  dev_id,
1724  dev_reduced_buffers[i] + start * entry_size,
1725  i,
1726  seg_entry_count * entry_size);
1727  offset += seg_entry_count * entry_size;
1728  }
1729  }
1730  return dev_segs_copy;
1731 }
1732 
1733 namespace {
1734 
1735 template <typename KeyT = int64_t, typename ValT = int64_t>
1736 __global__ void col_perfect_reducer(int8_t* this_seg,
1737  const size_t entry_count,
1738  const size_t write_stride,
1739  const int8_t* other_segs,
1740  const size_t seg_count,
1741  const size_t read_stride,
1742  const size_t key_count,
1743  const size_t val_count,
1744  const size_t* val_widths,
1745  const OP_KIND* agg_ops) {
1746  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1747  thrust::detail::is_same<ValT, int64_t>::value,
1748  "Unsupported template parameter other than int64_t for now");
1749  const auto thread_index =
1750  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
1751  const auto thread_count =
1752  seg_count == size_t(1) ? entry_count : entry_count * (seg_count - 1);
1753 
1754  auto keys_base = other_segs + sizeof(KeyT) * thread_index;
1755  auto keys_i64 = reinterpret_cast<const KeyT*>(keys_base);
1756 
1757  if (thread_index >= thread_count || is_empty_slot(*keys_i64)) {
1758  return;
1759  }
1760 
1761  auto read_base = keys_base;
1762  auto write_base = this_seg + sizeof(KeyT) * (thread_index % entry_count);
1763  const auto old = atomicCAS(reinterpret_cast<unsigned long long*>(write_base),
1764  EMPTY_KEY_64,
1765  static_cast<unsigned long long>(*keys_i64));
1766 
1767  if (is_empty_slot(static_cast<KeyT>(old))) {
1768  for (size_t i = 0; i < key_count; ++i,
1769  write_base += write_stride * sizeof(KeyT),
1770  read_base += read_stride * sizeof(KeyT)) {
1771  *reinterpret_cast<KeyT*>(write_base) = *reinterpret_cast<const KeyT*>(read_base);
1772  }
1773  }
1774 
1775  write_base = this_seg + sizeof(KeyT) * (thread_index % entry_count) +
1776  sizeof(KeyT) * write_stride * key_count;
1777  read_base = keys_base + sizeof(KeyT) * read_stride * key_count;
1778  reduce_func(
1779  write_base, write_stride, read_base, read_stride, val_count, val_widths, agg_ops);
1780 }
1781 
1782 template <typename KeyT = int64_t, typename ValT = int64_t>
1783 __global__ void row_perfect_reducer(int8_t* this_seg,
1784  const size_t entry_count,
1785  const int8_t* other_segs,
1786  const size_t seg_count,
1787  const size_t entry_size,
1788  const size_t key_count,
1789  const size_t val_count,
1790  const size_t* val_widths,
1791  const OP_KIND* agg_ops) {
1792  static_assert(thrust::detail::is_same<KeyT, int64_t>::value &&
1793  thrust::detail::is_same<ValT, int64_t>::value,
1794  "Unsupported template parameter other than int64_t for now");
1795  const auto thread_index =
1796  threadIdx.x + blockIdx.x * blockDim.x + blockIdx.y * blockDim.x * gridDim.x;
1797  const auto thread_count =
1798  seg_count == size_t(1) ? entry_count : entry_count * (seg_count - 1);
1799 
1800  auto keys_base = other_segs + entry_size * thread_index;
1801  auto keys_i64 = reinterpret_cast<const KeyT*>(keys_base);
1802 
1803  if (thread_index >= thread_count || is_empty_slot(*keys_i64)) {
1804  return;
1805  }
1806 
1807  auto write_base = this_seg + entry_size * (thread_index % entry_count);
1808  const auto old = atomicCAS(reinterpret_cast<unsigned long long*>(write_base),
1809  EMPTY_KEY_64,
1810  static_cast<unsigned long long>(*keys_i64));
1811 
1812  if (is_empty_slot(static_cast<KeyT>(old))) {
1813  memcpy(write_base, keys_base, sizeof(KeyT) * key_count);
1814  }
1815 
1816  write_base += sizeof(KeyT) * key_count;
1817  auto read_base = keys_base + sizeof(KeyT) * key_count;
1818  reduce_func(write_base, 1, read_base, 1, val_count, val_widths, agg_ops);
1819 }
1820 
1821 } // namespace
1822 
1823 void reduce_segment_on_device(int8_t* dev_seg_buf,
1824  const int8_t* dev_other_segs,
1825  const size_t entry_count,
1826  const size_t seg_count,
1827  const std::vector<size_t>& col_widths,
1828  const std::vector<OP_KIND>& agg_ops,
1829  const bool is_columnar,
1830  const size_t start,
1831  const size_t end) {
1832  const size_t val_count = agg_ops.size();
1833  const size_t key_count = col_widths.size() - val_count;
1834  size_t entry_size = 0;
1835  for (size_t i = 0; i < col_widths.size(); ++i) {
1836  entry_size += col_widths[i];
1837  }
1838 
1839  thrust::device_vector<size_t> dev_col_widths(col_widths);
1840  thrust::device_vector<OP_KIND> dev_agg_ops(agg_ops);
1841  const auto thread_count =
1842  seg_count == size_t(1) ? entry_count : entry_count * (seg_count - 1);
1843  if (is_columnar) {
1844  const size_t write_stride = entry_count;
1845  const size_t read_stride =
1846  (end - start) * (seg_count == size_t(1) ? 1 : (seg_count - 1));
1847  col_perfect_reducer<<<compute_grid_dim(thread_count), c_block_size>>>(
1848  dev_seg_buf + start * sizeof(int64_t),
1849  end - start,
1850  write_stride,
1851  dev_other_segs,
1852  seg_count,
1853  read_stride,
1854  key_count,
1855  val_count,
1856  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1857  thrust::raw_pointer_cast(dev_agg_ops.data()));
1858  } else {
1859  row_perfect_reducer<<<compute_grid_dim(thread_count), c_block_size>>>(
1860  dev_seg_buf + start * entry_size,
1861  end - start,
1862  dev_other_segs,
1863  seg_count,
1864  entry_size,
1865  key_count,
1866  val_count,
1867  thrust::raw_pointer_cast(dev_col_widths.data() + key_count),
1868  thrust::raw_pointer_cast(dev_agg_ops.data()));
1869  }
1870 }
1871 
1872 #endif // HAVE_CUDA