OmniSciDB  04ee39c94c
ProfileTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include "ProfileTest.h"
25 #include "../QueryEngine/Descriptors/RowSetMemoryOwner.h"
26 #include "../QueryEngine/ResultSet.h"
27 #include "Shared/measure.h"
28 #include "TestHelpers.h"
29 
30 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
31 #include <thrust/system_error.h>
32 #endif
33 
34 #include <gtest/gtest.h>
35 #include <boost/make_unique.hpp>
36 
37 #include <algorithm>
38 #include <future>
39 #include <random>
40 #include <unordered_map>
41 #include <unordered_set>
42 
43 bool g_gpus_present = false;
44 
45 const float c_space_usage = 2.0f;
46 
47 namespace {
48 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
49 void check_error(CUresult status) {
50  if (status != CUDA_SUCCESS) {
51  const char* errorString{nullptr};
52  cuGetErrorString(status, &errorString);
53  throw std::runtime_error(errorString ? errorString : "Unknown error");
54  }
55 }
56 #endif
57 
58 inline size_t get_gpu_count() {
59 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
60  int num_gpus = 0;
61  try {
62  check_error(cuInit(0));
63  check_error(cuDeviceGetCount(&num_gpus));
64  } catch (std::runtime_error&) {
65  return 0;
66  }
67  return num_gpus;
68 #else
69  return 0;
70 #endif
71 }
72 
73 inline bool is_gpu_present() {
74 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
75  return (get_gpu_count() > 0);
76 #else
77  return false;
78 #endif // HAVE_CUDA
79 }
80 
81 template <typename T = int64_t>
82 bool generate_numbers(int8_t* random_numbers,
83  const unsigned num_random_numbers,
84  const T min_number,
85  const T max_number,
86  const DIST_KIND dist,
87  const size_t stride = sizeof(T)) {
88  if (random_numbers == nullptr) {
89  return false;
90  }
91 
92  std::random_device rd;
93  std::mt19937 gen(rd());
94 
95  // values near the mean are the most likely
96  // standard deviation affects the dispersion of generated values from the mean
97  switch (dist) {
98  case NRM: {
99  std::normal_distribution<> d((max_number + min_number) / 2, 1);
100  for (unsigned i = 0; i < num_random_numbers; ++i) {
101  *reinterpret_cast<T*>(random_numbers + i * stride) =
102  std::max<T>(min_number, std::min<T>(max_number, std::round(d(gen))));
103  }
104  break;
105  }
106  case EXP1: {
107  std::exponential_distribution<> d(1);
108  for (unsigned i = 0; i < num_random_numbers; ++i) {
109  *reinterpret_cast<T*>(random_numbers + i * stride) =
110  std::max<T>(min_number, std::min<T>(max_number, std::round(d(gen))));
111  }
112  break;
113  }
114  case EXP2: {
115  std::exponential_distribution<> d(2);
116  for (unsigned i = 0; i < num_random_numbers; ++i) {
117  *reinterpret_cast<T*>(random_numbers + i * stride) =
118  std::max<T>(min_number, std::min<T>(max_number, std::round(d(gen))));
119  }
120  break;
121  }
122  case UNI: {
123  std::uniform_int_distribution<T> d(min_number, max_number);
124  for (unsigned i = 0; i < num_random_numbers; ++i) {
125  *reinterpret_cast<T*>(random_numbers + i * stride) = d(gen);
126  }
127  break;
128  }
129  case POI: {
130  std::poisson_distribution<T> d(4);
131  for (unsigned i = 0; i < num_random_numbers; ++i) {
132  *reinterpret_cast<T*>(random_numbers + i * stride) =
133  std::max<T>(min_number, std::min(max_number, d(gen)));
134  }
135  break;
136  }
137  default:
138  CHECK(false);
139  }
140 
141  return true;
142 }
143 
144 bool generate_columns_on_host(int8_t* buffers,
145  const size_t row_count,
146  const size_t col_count,
147  const std::vector<size_t>& col_widths,
148  const std::vector<std::pair<int64_t, int64_t>>& ranges,
149  const bool is_columnar,
150  const std::vector<DIST_KIND>& dists) {
151  if (buffers == nullptr) {
152  return false;
153  }
154  CHECK_EQ(col_widths.size(), col_count);
155  CHECK_EQ(ranges.size(), col_count);
156  size_t row_size = 0;
157  for (auto& wid : col_widths) {
158  row_size += wid;
159  }
160  std::vector<std::future<bool>> child_threads;
161  for (size_t i = 0; i < col_count;
162  buffers += (is_columnar ? row_count : 1) * col_widths[i++]) {
163  if (dists[i] == DIST_KIND::INVALID) {
164  continue;
165  }
166  CHECK_LE(ranges[i].first, ranges[i].second);
167  switch (col_widths[i]) {
168  case 4:
169  child_threads.push_back(std::async(std::launch::async,
170  generate_numbers<int32_t>,
171  buffers,
172  row_count,
173  static_cast<int32_t>(ranges[i].first),
174  static_cast<int32_t>(ranges[i].second),
175  dists[i],
176  (is_columnar ? 4 : row_size)));
177  break;
178  case 8:
179  child_threads.push_back(std::async(std::launch::async,
180  generate_numbers<int64_t>,
181  buffers,
182  row_count,
183  ranges[i].first,
184  ranges[i].second,
185  dists[i],
186  (is_columnar ? 8 : row_size)));
187  break;
188  default:
189  CHECK(false);
190  }
191  }
192  for (auto& child : child_threads) {
193  child.get();
194  }
195  return true;
196 }
197 
198 inline void init_groups_on_host(int8_t* groups,
199  const size_t group_count,
200  const size_t col_count,
201  const std::vector<size_t>& col_widths,
202  const std::vector<size_t>& init_vals,
203  const bool is_columnar) {
204  CHECK_EQ(col_count, col_widths.size());
205  CHECK_EQ(col_count, init_vals.size());
206  std::vector<std::future<void>> child_threads;
207  const size_t cpu_count = cpu_threads();
208  const auto stride = (group_count + cpu_count - 1) / cpu_count;
209  size_t row_size = 0;
210  for (auto wid : col_widths) {
211  row_size += wid;
212  }
213 
214  for (size_t start_group = 0; start_group < group_count; start_group += stride) {
215  const auto end_group = std::min(group_count, start_group + stride);
216  if (is_columnar) {
217  child_threads.push_back(
218  std::async(std::launch::async, [&, start_group, end_group]() {
219  auto col_base = groups;
220  for (size_t j = 0; j < col_count; col_base += col_widths[j++] * group_count) {
221  for (size_t i = start_group; i < end_group; ++i) {
222  switch (col_widths[j]) {
223  case 4: {
224  auto col_ptr = reinterpret_cast<uint32_t*>(col_base);
225  std::fill(col_ptr,
226  col_ptr + group_count,
227  static_cast<uint32_t>(init_vals[j]));
228  break;
229  }
230  case 8: {
231  auto col_ptr = reinterpret_cast<size_t*>(col_base);
232  std::fill(col_ptr, col_ptr + group_count, init_vals[j]);
233  break;
234  }
235  default:
236  CHECK(false);
237  }
238  }
239  }
240  }));
241  } else {
242  child_threads.push_back(
243  std::async(std::launch::async, [&, start_group, end_group]() {
244  for (size_t i = start_group; i < end_group; ++i) {
245  auto row_base = groups + i * row_size;
246  for (size_t j = 0; j < col_count; row_base += col_widths[j++]) {
247  switch (col_widths[j]) {
248  case 4:
249  *reinterpret_cast<uint32_t*>(row_base) =
250  static_cast<uint32_t>(init_vals[j]);
251  break;
252  case 8:
253  *reinterpret_cast<size_t*>(row_base) = init_vals[j];
254  break;
255  default:
256  CHECK(false);
257  }
258  }
259  }
260  }));
261  }
262  }
263  for (auto& child : child_threads) {
264  child.get();
265  }
266 }
267 
268 #if defined(TRY_COLUMNAR) || defined(TRY_MASH_COLUMNAR)
269 void columnarize_groups_on_host(int8_t* columnar_buffer,
270  const int8_t* rowwise_buffer,
271  const size_t row_count,
272  const std::vector<size_t>& col_widths) {
273  std::vector<std::future<void>> child_threads;
274  const size_t cpu_count = cpu_threads();
275  const auto stride = (row_count + cpu_count - 1) / cpu_count;
276  size_t row_size = 0;
277  for (auto wid : col_widths) {
278  row_size += wid;
279  }
280 
281  for (size_t start_row = 0; start_row < row_count; start_row += stride) {
282  const auto end_row = std::min(row_count, start_row + stride);
283  child_threads.push_back(std::async(std::launch::async, [&, start_row, end_row]() {
284  for (size_t i = start_row; i < end_row; ++i) {
285  auto read_ptr = rowwise_buffer + i * row_size;
286  auto write_base = columnar_buffer;
287  for (size_t j = 0; j < col_widths.size(); ++j) {
288  auto write_ptr = write_base + i * col_widths[j];
289  switch (col_widths[j]) {
290  case 4:
291  *reinterpret_cast<uint32_t*>(write_ptr) =
292  *reinterpret_cast<const uint32_t*>(read_ptr);
293  break;
294  case 8:
295  *reinterpret_cast<size_t*>(write_ptr) =
296  *reinterpret_cast<const size_t*>(read_ptr);
297  break;
298  default:
299  CHECK(false);
300  }
301  read_ptr += col_widths[j];
302  write_base += row_count * col_widths[j];
303  }
304  }
305  }));
306  }
307  for (auto& child : child_threads) {
308  child.get();
309  }
310 }
311 #endif
312 
313 template <typename ValT = int64_t>
315  switch (op) {
316  case OP_COUNT:
317  case OP_SUM:
318  return ValT(0);
319  case OP_MIN:
320  return std::numeric_limits<ValT>::max();
321  case OP_MAX:
322  return std::numeric_limits<ValT>::min();
323  default:
324  CHECK(false);
325  }
326  return ValT(0);
327 }
328 
330  switch (op) {
331  case OP_COUNT:
332  return DIST_KIND::INVALID;
333  case OP_SUM:
334  case OP_MIN:
335  case OP_MAX:
336  return DIST_KIND::UNI;
337  default:
338  CHECK(false);
339  }
340  return DIST_KIND::INVALID;
341 }
342 
343 template <typename ValT = int64_t>
344 std::pair<ValT, ValT> get_default_range(OP_KIND op) {
345  switch (op) {
346  case OP_COUNT:
347  return {ValT(0), ValT(0)};
348  case OP_SUM:
349  case OP_MIN:
350  case OP_MAX:
351  return {std::numeric_limits<ValT>::min(), std::numeric_limits<ValT>::max()};
352  default:
353  CHECK(false);
354  }
355  CHECK(false);
356  return {ValT(0), ValT(0)};
357 }
358 
359 template <class T>
360 inline void hash_combine(std::size_t& seed, T const& v) {
361  seed ^= std::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
362 }
363 
364 } // namespace
365 
366 namespace std {
367 
368 template <typename T>
369 struct hash<vector<T>> {
370  size_t operator()(const vector<T>& vint) const {
371  size_t seed = 0;
372  for (auto i : vint) {
373  // Combine the hash of the current vector with the hashes of the previous
374  // ones
375  hash_combine(seed, i);
376  }
377  return seed;
378  }
379 };
380 
381 } // namespace std
382 
383 namespace {
384 template <typename KeyT = int64_t>
385 inline bool is_empty_slot(const KeyT k) {
386  static_assert(std::is_same<KeyT, int64_t>::value,
387  "Unsupported template parameter other than int64_t for now");
388  return k == EMPTY_KEY_64;
389 }
390 
391 template <typename KeyT = int64_t, typename ValT = int64_t>
393  public:
394  using ResultType = std::unordered_map<std::vector<KeyT>, std::vector<ValT>>;
395 
396  explicit AggregateEmulator(const std::vector<OP_KIND>& ops) : agg_ops_(ops) {}
397 
398  ResultType run(const int8_t* buffers,
399  const size_t key_count,
400  const size_t val_count,
401  const size_t row_count,
402  const bool is_columnar) {
403  std::vector<std::future<void>> child_threads;
404  const size_t cpu_count = cpu_threads();
405  const size_t stride = (row_count + cpu_count - 1) / cpu_count;
406  std::vector<ResultType> partial_results(cpu_count);
407  for (size_t start_row = 0, i = 0; start_row < row_count; start_row += stride, ++i) {
408  const auto end_row = std::min(row_count, start_row + stride);
409  child_threads.push_back(std::async(std::launch::async,
410  &AggregateEmulator::runDispatch,
411  this,
412  std::ref(partial_results[i]),
413  buffers,
414  key_count,
415  val_count,
416  row_count,
417  start_row,
418  end_row,
419  is_columnar));
420  }
421 
422  for (auto& child : child_threads) {
423  child.get();
424  }
425 
426  return reduce(partial_results);
427  }
428 
429  bool compare(const int8_t* buffers,
430  const size_t key_count,
431  const size_t val_count,
432  const size_t group_count,
433  const bool is_columnar,
434  const ResultType& ref_result) {
435  std::vector<std::future<size_t>> child_threads;
436  const size_t cpu_count = cpu_threads();
437  const auto stride = (group_count + cpu_count - 1) / cpu_count;
438  for (size_t start_group = 0; start_group < group_count; start_group += stride) {
439  const auto end_group = std::min(group_count, start_group + stride);
440  child_threads.push_back(std::async(std::launch::async,
441  &AggregateEmulator::compareDispatch,
442  this,
443  buffers,
444  key_count,
445  val_count,
446  group_count,
447  start_group,
448  end_group,
449  is_columnar,
450  ref_result));
451  }
452  size_t matches = 0;
453  for (auto& child : child_threads) {
454  matches += child.get();
455  }
456 
457  return matches == ref_result.size();
458  }
459 
460  ResultType reduce(const std::vector<ResultType>& partial_results) {
461  ResultType final_result;
462  if (partial_results.size() == 1) {
463  final_result = partial_results[0];
464  return final_result;
465  }
466  for (auto& groups : partial_results) {
467  for (auto& grp : groups) {
468  auto& keys = grp.first;
469  if (is_empty_slot(keys[0])) {
470  continue;
471  }
472  if (!final_result.count(keys)) {
473  final_result.insert(std::make_pair(keys, grp.second));
474  continue;
475  }
476  const auto val_count = agg_ops_.size();
477  CHECK_EQ(val_count, final_result[keys].size());
478  CHECK_EQ(val_count, grp.second.size());
479  for (size_t v = 0; v < val_count; ++v) {
480  const ValT value = grp.second[v];
481  switch (agg_ops_[v]) {
482  case OP_COUNT:
483  case OP_SUM:
484  final_result[keys][v] += value;
485  break;
486  case OP_MIN:
487  final_result[keys][v] = std::min(final_result[keys][v], value);
488  break;
489  case OP_MAX:
490  final_result[keys][v] = std::max(final_result[keys][v], value);
491  break;
492  default:
493  CHECK(false);
494  }
495  }
496  }
497  }
498  return final_result;
499  }
500 
501  private:
502  void runDispatch(ResultType& partial_res,
503  const int8_t* buffers,
504  const size_t key_count,
505  const size_t val_count,
506  const size_t row_count,
507  const size_t start_row,
508  const size_t end_row,
509  const bool is_columnar) {
510  CHECK_EQ(agg_ops_.size(), val_count);
511  const size_t row_size = sizeof(KeyT) * key_count + sizeof(ValT) * val_count;
512  for (size_t i = start_row; i < end_row; ++i) {
513  std::vector<KeyT> keys(key_count);
514  auto key_buffers = reinterpret_cast<const KeyT*>(buffers);
515  if (is_columnar) {
516  for (size_t k = 0; k < key_count; ++k) {
517  keys[k] = key_buffers[i + k * row_count];
518  }
519  } else {
520  for (size_t k = 0; k < key_count; ++k) {
521  keys[k] = reinterpret_cast<const KeyT*>(buffers + i * row_size)[k];
522  }
523  }
524  CHECK_EQ(keys.size(), key_count);
525  if (is_empty_slot(keys[0])) {
526  continue;
527  }
528 
529  const bool inserted = partial_res.count(keys) != 0;
530  if (inserted) {
531  CHECK_EQ(partial_res[keys].size(), val_count);
532  } else {
533  partial_res[keys] = std::vector<ValT>(val_count);
534  }
535 
536  for (size_t v = 0; v < val_count; ++v) {
537  ValT value;
538  if (is_columnar) {
539  auto val_buffer =
540  reinterpret_cast<const ValT*>(key_buffers + key_count * row_count);
541  value = val_buffer[i + v * row_count];
542  } else {
543  auto val_buffer = reinterpret_cast<const ValT*>(buffers + row_size * i +
544  sizeof(KeyT) * key_count);
545  value = val_buffer[v];
546  }
547 
548  switch (agg_ops_[v]) {
549  case OP_COUNT:
550  if (inserted) {
551  ++partial_res[keys][v];
552  } else {
553  partial_res[keys][v] = 1;
554  }
555  break;
556  case OP_SUM:
557  if (inserted) {
558  partial_res[keys][v] += value;
559  } else {
560  partial_res[keys][v] = value;
561  }
562  break;
563  case OP_MIN:
564  if (inserted) {
565  partial_res[keys][v] = std::min(partial_res[keys][v], value);
566  } else {
567  partial_res[keys][v] = value;
568  }
569  break;
570  case OP_MAX:
571  if (inserted) {
572  partial_res[keys][v] = std::max(partial_res[keys][v], value);
573  } else {
574  partial_res[keys][v] = value;
575  }
576  break;
577  default:
578  CHECK(false);
579  }
580  }
581  }
582  }
583 
584  size_t compareDispatch(const int8_t* buffers,
585  const size_t key_count,
586  const size_t val_count,
587  const size_t group_count,
588  const size_t start_group,
589  const size_t end_group,
590  const bool is_columnar,
591  const ResultType& ref_result) {
592  CHECK_LT(size_t(0), key_count);
593  size_t matches = 0;
594  const size_t row_size = sizeof(KeyT) * key_count + sizeof(ValT) * val_count;
595  for (size_t i = start_group; i < end_group; ++i) {
596  std::vector<KeyT> keys(key_count);
597  const auto key_buffers = reinterpret_cast<const KeyT*>(buffers);
598  if (is_columnar) {
599  for (size_t k = 0; k < key_count; ++k) {
600  keys[k] = key_buffers[i + k * group_count];
601  }
602  } else {
603  for (size_t k = 0; k < key_count; ++k) {
604  keys[k] = reinterpret_cast<const KeyT*>(buffers + i * row_size)[k];
605  }
606  }
607  if (is_empty_slot(keys[0])) {
608  continue;
609  }
610  auto row_it = ref_result.find(keys);
611  if (row_it == ref_result.end()) {
612  return 0;
613  }
614  auto& ref_vals = row_it->second;
615  CHECK_EQ(val_count, ref_vals.size());
616  std::vector<ValT> actual_vals(val_count);
617  for (size_t v = 0; v < val_count; ++v) {
618  if (is_columnar) {
619  auto val_buffers =
620  reinterpret_cast<const ValT*>(key_buffers + key_count * group_count);
621  actual_vals[v] = val_buffers[i + v * group_count];
622  } else {
623  auto val_buffers = reinterpret_cast<const ValT*>(buffers + row_size * i +
624  sizeof(KeyT) * key_count);
625  actual_vals[v] = val_buffers[v];
626  }
627  }
628  for (size_t v = 0; v < val_count; ++v) {
629  if (actual_vals[v] != ref_vals[v]) {
630  return 0;
631  }
632  }
633  ++matches;
634  }
635  return matches;
636  }
637 
638  std::vector<OP_KIND> agg_ops_;
639 };
640 
641 #ifdef SAVE_MASH_BUF
642 template <bool isColumnar, typename KeyT = int64_t, typename ValT = int64_t>
643 void mash_restore_dispatch(int8_t* output_buffer,
644  const int8_t* groups_buffer,
645  const size_t group_count,
646  const size_t entry_size,
647  const int8_t* input_buffer,
648  const size_t row_count,
649  const size_t key_count,
650  const size_t row_size,
651  const std::vector<size_t>& col_widths,
652  const size_t empty_key,
653  const size_t start_group,
654  const size_t end_group) {
655  const auto val_count = col_widths.size() - key_count;
656  const auto read_step = isColumnar ? row_count : 1;
657  const auto write_step = isColumnar ? group_count : 1;
658  for (size_t i = start_group; i < end_group; ++i) {
659  const auto group_ptr =
660  groups_buffer + i * (isColumnar ? sizeof(int64_t) : entry_size);
661  const auto key_idx = *reinterpret_cast<const int64_t*>(group_ptr);
662  auto read_ptr = input_buffer + key_idx * (isColumnar ? sizeof(KeyT) : row_size);
663  auto write_ptr = output_buffer + i * (isColumnar ? sizeof(KeyT) : row_size);
664  if (is_empty_slot(key_idx)) {
665  *reinterpret_cast<KeyT*>(write_ptr) = static_cast<KeyT>(empty_key);
666  continue;
667  }
668  for (size_t k = 0; k < key_count; ++k,
669  write_ptr += write_step * sizeof(KeyT),
670  read_ptr += read_step * sizeof(KeyT)) {
671  *reinterpret_cast<KeyT*>(write_ptr) = *reinterpret_cast<const KeyT*>(read_ptr);
672  }
673  if (isColumnar) {
674  write_ptr =
675  output_buffer + key_count * sizeof(KeyT) * group_count + sizeof(ValT) * i;
676  read_ptr = groups_buffer + sizeof(int64_t) * group_count + sizeof(ValT) * i;
677  for (size_t v = 0; v < val_count; ++v,
678  write_ptr += write_step * sizeof(ValT),
679  read_ptr += write_step * sizeof(ValT)) {
680  *reinterpret_cast<ValT*>(write_ptr) = *reinterpret_cast<const ValT*>(read_ptr);
681  }
682  } else {
683  memcpy(write_ptr, group_ptr + sizeof(int64_t), entry_size - sizeof(int64_t));
684  }
685  }
686 }
687 
688 template <bool isColumnar = false, typename KeyT = int64_t, typename ValT = int64_t>
689 void mash_restore_keys(int8_t* output_buffer,
690  const int8_t* groups_buffer,
691  const size_t group_count,
692  const int8_t* input_buffer,
693  const size_t row_count,
694  const size_t key_count,
695  const std::vector<size_t>& col_widths,
696  const std::vector<size_t>& init_vals) {
697  size_t entry_size = sizeof(int64_t);
698  for (size_t i = key_count; i < col_widths.size(); ++i) {
699  entry_size += col_widths[i];
700  }
701  size_t row_size = 0;
702  for (size_t i = 0; i < col_widths.size(); ++i) {
703  row_size += col_widths[i];
704  }
705  std::vector<std::future<void>> child_threads;
706  const size_t cpu_count = cpu_threads();
707  const auto stride = (group_count + cpu_count - 1) / cpu_count;
708  for (size_t start_group = 0; start_group < group_count; start_group += stride) {
709  const auto end_group = std::min(group_count, start_group + stride);
710  child_threads.push_back(std::async(std::launch::async,
711  mash_restore_dispatch<isColumnar, KeyT, ValT>,
712  output_buffer,
713  groups_buffer,
714  group_count,
715  entry_size,
716  input_buffer,
717  row_count,
718  key_count,
719  row_size,
720  std::ref(col_widths),
721  init_vals[0],
722  start_group,
723  end_group));
724  }
725  for (auto& child : child_threads) {
726  child.get();
727  }
728 }
729 #endif
730 
731 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
732 class CudaTimer {
733  public:
734  CudaTimer(size_t buf_sz) : used_size(buf_sz) {
735  cudaEventCreate(&start_);
736  cudaEventCreate(&stop_);
737  cudaEventRecord(start_, 0);
738  }
739 
740  CudaTimer() : used_size(size_t(-1)) {
741  cudaEventCreate(&start_);
742  cudaEventCreate(&stop_);
743  cudaEventRecord(start_, 0);
744  }
745 
746  ~CudaTimer() {
747  cudaEventRecord(stop_, 0);
748  cudaEventSynchronize(stop_);
749  float elapsedTime;
750  cudaEventElapsedTime(&elapsedTime, start_, stop_);
751  if (used_size == size_t(-1)) {
752  std::cout << "Current query took " << elapsedTime << " ms on device.\n";
753  } else {
754  std::cout << "Current query took " << elapsedTime << " ms on device using "
755  << used_size / (1024 * 1024.f) << " MB VRAM.\n";
756  }
757  cudaEventDestroy(start_);
758  cudaEventDestroy(stop_);
759  }
760 
761  private:
762  const size_t used_size;
763  cudaEvent_t start_;
764  cudaEvent_t stop_;
765 };
766 #endif
767 } // namespace
768 
769 TEST(Hash, Baseline) {
770  // Config
771  const std::vector<OP_KIND> agg_ops{OP_COUNT, OP_MAX};
772  const size_t key_count = 3;
773  const size_t val_count = 2;
774  const size_t row_count = 20000000;
775  const bool is_columnar = false;
776 
777  CHECK_EQ(agg_ops.size(), val_count);
778  std::vector<size_t> col_widths(key_count, sizeof(int64_t));
779  std::vector<size_t> init_vals(key_count, EMPTY_KEY_64);
780  for (size_t i = 0; i < val_count; ++i) {
781  col_widths.push_back(sizeof(uint64_t));
782  init_vals.push_back(get_default_value(agg_ops[i]));
783  }
784 
785  std::vector<DIST_KIND> dist_tries{DIST_KIND::UNI,
791  std::vector<std::string> dist_names{"uniform(-100, 100)",
792  "uniform(-100000, 100000)",
793  "normal(0, 1)",
794  "exp(1)",
795  "exp(2)",
796  "poisson(4)"};
797  std::vector<std::pair<int64_t, int64_t>> range_tries{
798  {-100, 100},
799  {-100000, 100000},
800  {-1000, 1000},
801  {std::numeric_limits<int64_t>::min(), std::numeric_limits<int64_t>::max()},
802  {std::numeric_limits<int64_t>::min(), std::numeric_limits<int64_t>::max()},
803  {std::numeric_limits<int64_t>::min(), std::numeric_limits<int64_t>::max()}};
804  CHECK_EQ(dist_tries.size(), dist_names.size());
805  CHECK_EQ(dist_tries.size(), range_tries.size());
806 
807  std::vector<std::vector<size_t>> selected_tries;
808  for (size_t i = 0; i < dist_tries.size(); ++i) {
809  selected_tries.emplace_back(key_count, i);
810  }
811  selected_tries.push_back({3, 2, 4, 3, 4, 5, 1});
812  for (size_t i = 0; i < selected_tries.size(); ++i) {
813  std::cout << "Try distributions of " << key_count << " keys: ";
814  std::vector<DIST_KIND> distributions;
815  for (size_t j = 0; j < key_count; ++j) {
816  std::cout << dist_names[selected_tries[i][j]] << (j == key_count - 1 ? "" : ", ");
817  distributions.push_back(dist_tries[selected_tries[i][j]]);
818  }
819  std::cout << std::endl;
820  for (size_t v = 0; v < val_count; ++v) {
821  distributions.push_back(get_default_dist(agg_ops[v]));
822  }
823 
824  const auto col_count = key_count + val_count;
825  std::vector<std::pair<int64_t, int64_t>> ranges;
826  for (size_t j = 0; j < key_count; ++j) {
827  ranges.push_back(range_tries[selected_tries[i][j]]);
828  }
829  for (size_t v = 0; v < val_count; ++v) {
830  ranges.push_back(get_default_range(agg_ops[v]));
831  }
832 
833  // Generate random data.
834  std::vector<int64_t> input_buffer(row_count * col_count);
835 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
836  int8_t* dev_input_buffer = nullptr;
837  cudaMalloc(&dev_input_buffer, input_buffer.size() * sizeof(int64_t));
838  if (generate_columns_on_device(dev_input_buffer,
839  row_count,
840  col_count,
841  col_widths,
842  ranges,
843  is_columnar,
844  distributions)) {
845  cudaMemcpy(&input_buffer[0],
846  dev_input_buffer,
847  input_buffer.size() * sizeof(int64_t),
848  cudaMemcpyDeviceToHost);
849  } else
850 #endif
851  {
852  generate_columns_on_host(reinterpret_cast<int8_t*>(&input_buffer[0]),
853  row_count,
854  col_count,
855  col_widths,
856  ranges,
857  is_columnar,
858  distributions);
859 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
860  cudaMemcpy(dev_input_buffer,
861  &input_buffer[0],
862  input_buffer.size() * sizeof(int64_t),
863  cudaMemcpyHostToDevice);
864 #endif
865  }
866  AggregateEmulator<int64_t, int64_t> emulator(agg_ops);
867  auto ref_result = emulator.run(reinterpret_cast<int8_t*>(&input_buffer[0]),
868  key_count,
869  val_count,
870  row_count,
871  is_columnar);
872  std::cout << " Generated " << row_count / 1000000.f << "M rows aggregated into "
873  << ref_result.size() << " groups.\n";
874  const auto actual_group_count =
875  static_cast<size_t>(ref_result.size() * c_space_usage);
876  std::vector<int64_t> groups_buffer(actual_group_count * col_count, 0);
877 #ifdef TRY_COLUMNAR
878  std::vector<int64_t> columnar_groups_buffer(actual_group_count * col_count, 0);
879 #endif
880 #if defined(TRY_MASH) || defined(TRY_MASH_COLUMNAR)
881 #ifdef SAVE_MASH_BUF
882  const auto actual_col_count = 1 + val_count;
883 #else
884  const auto actual_col_count = col_count;
885 #endif
886 #endif
887 #ifdef TRY_MASH
888  std::vector<int64_t> mash_groups_buffer(actual_group_count * actual_col_count, 0);
889 #endif
890 #ifdef TRY_MASH_COLUMNAR
891  std::vector<int64_t> mash_columnar_groups_buffer(
892  actual_group_count * actual_col_count, 0);
893 #endif
894 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
895  const auto device_type = DEV_KIND::GPU;
896  if (device_type == DEV_KIND::GPU) {
897  std::cout << " Baseline: ";
898  try {
899  int8_t* dev_groups_buffer = nullptr;
900  cudaMalloc(&dev_groups_buffer, groups_buffer.size() * sizeof(int64_t));
901  init_groups_on_device(dev_groups_buffer,
902  actual_group_count,
903  col_count,
904  col_widths,
905  init_vals,
906  is_columnar);
907  {
908  CudaTimer timer(groups_buffer.size() * sizeof(int64_t));
909  run_query_on_device(dev_groups_buffer,
910  actual_group_count,
911  dev_input_buffer,
912  row_count,
913  key_count,
914  val_count,
915  col_widths,
916  agg_ops,
917  is_columnar);
918  }
919  cudaMemcpy(&groups_buffer[0],
920  dev_groups_buffer,
921  groups_buffer.size() * sizeof(int64_t),
922  cudaMemcpyDeviceToHost);
923  cudaFree(dev_groups_buffer);
924  } catch (const thrust::system_error& e) {
925  std::cout << e.what() << std::endl;
926  }
927 #if defined(TRY_MASH) || defined(TRY_MASH_COLUMNAR)
928 #ifdef SAVE_MASH_BUF
929  std::vector<size_t> actual_col_widths(1, sizeof(int64_t));
930  std::vector<size_t> actual_init_vals(1, EMPTY_KEY_64);
931  for (size_t i = 0; i < val_count; ++i) {
932  actual_col_widths.push_back(col_widths[key_count + i]);
933  actual_init_vals.push_back(init_vals[key_count + i]);
934  }
935 #else
936  const auto& actual_col_widths = col_widths;
937  const auto& actual_init_vals = init_vals;
938 #endif
939 #endif // TRY_MASH || TRY_MASH_COLUMNAR
940 #ifdef TRY_MASH
941  std::cout << " MASH: ";
942  try {
943  int8_t* dev_mash_groups_buffer = nullptr;
944  cudaMalloc(&dev_mash_groups_buffer, mash_groups_buffer.size() * sizeof(int64_t));
945  init_groups_on_device(dev_mash_groups_buffer,
946  actual_group_count,
947  actual_col_count,
948  actual_col_widths,
949  actual_init_vals,
950  is_columnar);
951  {
952  CudaTimer timer;
953  mash_run_query_on_device(dev_mash_groups_buffer,
954  actual_group_count,
955  dev_input_buffer,
956  row_count,
957  key_count,
958  val_count,
959  col_widths,
960  agg_ops,
961  is_columnar);
962  }
963  cudaMemcpy(&mash_groups_buffer[0],
964  dev_mash_groups_buffer,
965  mash_groups_buffer.size() * sizeof(int64_t),
966  cudaMemcpyDeviceToHost);
967 #ifdef SAVE_MASH_BUF
968  if (key_count > 1) {
969  std::vector<int64_t> temp_groups_buffer(actual_group_count * col_count, 0);
970  auto elapsedTime = measure<>::execution([&]() {
971  mash_restore_keys(reinterpret_cast<int8_t*>(&temp_groups_buffer[0]),
972  reinterpret_cast<int8_t*>(&mash_groups_buffer[0]),
973  actual_group_count,
974  reinterpret_cast<int8_t*>(&input_buffer[0]),
975  row_count,
976  key_count,
977  col_widths,
978  init_vals);
979  });
980  std::cout << " \tAnd optional " << elapsedTime << " ms on host if using "
981  << mash_groups_buffer.size() * sizeof(int64_t) / (1024 * 1024.f)
982  << " MB VRAM instead.\n";
983  mash_groups_buffer.swap(temp_groups_buffer);
984  }
985 #endif
986  cudaFree(dev_mash_groups_buffer);
987  } catch (const thrust::system_error& e) {
988  std::cout << e.what() << std::endl;
989  }
990 #endif // TRY_MASH
991 #ifdef TRY_COLUMNAR
992  std::cout << " Baseline Columnar: ";
993  try {
994  const bool is_columnar = true;
995  int8_t* dev_groups_buffer = nullptr;
996  cudaMalloc(&dev_groups_buffer, columnar_groups_buffer.size() * sizeof(int64_t));
997 #if 0
998  int8_t* dev_columnar_input_buffer = nullptr;
999  cudaMalloc(&dev_columnar_input_buffer, input_buffer.size() * sizeof(int64_t));
1000  columnarize_groups_on_device(dev_columnar_input_buffer, dev_input_buffer, row_count, col_widths);
1001 #else
1002  std::vector<int64_t> columnar_input_buffer(input_buffer.size());
1003  columnarize_groups_on_host(reinterpret_cast<int8_t*>(&columnar_input_buffer[0]),
1004  reinterpret_cast<const int8_t*>(&input_buffer[0]),
1005  row_count,
1006  col_widths);
1007  cudaMemcpy(dev_input_buffer,
1008  &columnar_input_buffer[0],
1009  columnar_input_buffer.size() * sizeof(int64_t),
1010  cudaMemcpyHostToDevice);
1011 #endif
1012  init_groups_on_device(dev_groups_buffer,
1013  actual_group_count,
1014  col_count,
1015  col_widths,
1016  init_vals,
1017  is_columnar);
1018  {
1019  CudaTimer timer(columnar_groups_buffer.size() * sizeof(int64_t));
1020  run_query_on_device(dev_groups_buffer,
1021  actual_group_count,
1022  dev_input_buffer,
1023  row_count,
1024  key_count,
1025  val_count,
1026  col_widths,
1027  agg_ops,
1028  is_columnar);
1029  }
1030  cudaMemcpy(&columnar_groups_buffer[0],
1031  dev_groups_buffer,
1032  columnar_groups_buffer.size() * sizeof(int64_t),
1033  cudaMemcpyDeviceToHost);
1034  cudaFree(dev_groups_buffer);
1035  } catch (const thrust::system_error& e) {
1036  std::cout << e.what() << std::endl;
1037  }
1038 #endif // TRY_COLUMNAR
1039 #ifdef TRY_MASH_COLUMNAR
1040  std::cout << " MASH Columnar: ";
1041  try {
1042  const bool is_columnar = true;
1043  int8_t* dev_mash_groups_buffer = nullptr;
1044  cudaMalloc(&dev_mash_groups_buffer,
1045  mash_columnar_groups_buffer.size() * sizeof(int64_t));
1046  std::vector<int64_t> columnar_input_buffer(input_buffer.size());
1047  columnarize_groups_on_host(reinterpret_cast<int8_t*>(&columnar_input_buffer[0]),
1048  reinterpret_cast<const int8_t*>(&input_buffer[0]),
1049  row_count,
1050  col_widths);
1051  cudaMemcpy(dev_input_buffer,
1052  &columnar_input_buffer[0],
1053  columnar_input_buffer.size() * sizeof(int64_t),
1054  cudaMemcpyHostToDevice);
1055  init_groups_on_device(dev_mash_groups_buffer,
1056  actual_group_count,
1057  actual_col_count,
1058  actual_col_widths,
1059  actual_init_vals,
1060  is_columnar);
1061  {
1062  CudaTimer timer;
1063  mash_run_query_on_device(dev_mash_groups_buffer,
1064  actual_group_count,
1065  dev_input_buffer,
1066  row_count,
1067  key_count,
1068  val_count,
1069  col_widths,
1070  agg_ops,
1071  is_columnar);
1072  }
1073  cudaMemcpy(&mash_columnar_groups_buffer[0],
1074  dev_mash_groups_buffer,
1075  mash_columnar_groups_buffer.size() * sizeof(int64_t),
1076  cudaMemcpyDeviceToHost);
1077 #ifdef SAVE_MASH_BUF
1078  if (key_count > 1) {
1079  std::vector<int64_t> temp_groups_buffer(actual_group_count * col_count, 0);
1080  auto elapsedTime = measure<>::execution([&]() {
1081  mash_restore_keys<true>(
1082  reinterpret_cast<int8_t*>(&temp_groups_buffer[0]),
1083  reinterpret_cast<int8_t*>(&mash_columnar_groups_buffer[0]),
1084  actual_group_count,
1085  reinterpret_cast<int8_t*>(&columnar_input_buffer[0]),
1086  row_count,
1087  key_count,
1088  col_widths,
1089  init_vals);
1090  });
1091  std::cout << " \t\t And optional " << elapsedTime << " ms on host if using "
1092  << mash_columnar_groups_buffer.size() * sizeof(int64_t) /
1093  (1024 * 1024.f)
1094  << " MB VRAM instead.\n";
1095  mash_columnar_groups_buffer.swap(temp_groups_buffer);
1096  }
1097 #endif
1098  cudaFree(dev_mash_groups_buffer);
1099  } catch (const thrust::system_error& e) {
1100  std::cout << e.what() << std::endl;
1101  }
1102 #endif // TRY_MASH_COLUMNAR
1103  } else
1104 #endif // HAVE_CUDA
1105  {
1106  init_groups_on_host(reinterpret_cast<int8_t*>(&groups_buffer[0]),
1107  actual_group_count,
1108  col_count,
1109  col_widths,
1110  init_vals,
1111  is_columnar);
1112  auto elapsedTime = measure<>::execution([&]() {
1113  // Do calculation on host
1114  });
1115  std::cout << " Current query took " << elapsedTime << " ms on host\n";
1116  }
1117 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1118  CHECK(dev_input_buffer);
1119  cudaFree(dev_input_buffer);
1120  // TODO(miyu): enable this after profiling aggregation on host is added.
1121  ASSERT_TRUE(emulator.compare(reinterpret_cast<int8_t*>(&groups_buffer[0]),
1122  key_count,
1123  val_count,
1124  actual_group_count,
1125  is_columnar,
1126  ref_result));
1127 #endif
1128 #ifdef TRY_COLUMNAR
1129  ASSERT_TRUE(emulator.compare(reinterpret_cast<int8_t*>(&columnar_groups_buffer[0]),
1130  key_count,
1131  val_count,
1132  actual_group_count,
1133  true,
1134  ref_result));
1135 #endif
1136 #ifdef TRY_MASH
1137  ASSERT_TRUE(emulator.compare(reinterpret_cast<int8_t*>(&mash_groups_buffer[0]),
1138  key_count,
1139  val_count,
1140  actual_group_count,
1141  is_columnar,
1142  ref_result));
1143 #endif
1144 #ifdef TRY_MASH_COLUMNAR
1145  ASSERT_TRUE(
1146  emulator.compare(reinterpret_cast<int8_t*>(&mash_columnar_groups_buffer[0]),
1147  key_count,
1148  val_count,
1149  actual_group_count,
1150  true,
1151  ref_result));
1152 #endif
1153  }
1154 }
1155 
1156 namespace {
1157 
1158 template <typename KeyT = int64_t>
1159 void reset_entry(KeyT* entry_ptr) {
1160  static_assert(std::is_same<KeyT, int64_t>::value,
1161  "Unsupported template parameter other than int64_t for now");
1162  *entry_ptr = static_cast<KeyT>(EMPTY_KEY_64);
1163 }
1164 
1165 template <bool isColumnar, typename KeyT = int64_t>
1167  public:
1168  Deduplicater(int8_t* row_buff,
1169  const size_t row_size,
1170  const size_t row_count,
1171  const size_t key_count)
1172  : buff_(row_buff)
1173  , entry_sz_(row_size)
1174  , entry_cnt_(row_count)
1175  , key_cnt_(key_count) {}
1176  size_t run() {
1177  std::vector<std::future<void>> child_threads;
1178  const size_t cpu_count = cpu_threads();
1179  const size_t stride = (entry_cnt_ + cpu_count - 1) / cpu_count;
1180 
1181  std::vector<std::unordered_set<std::vector<KeyT>>> mask_set(
1182  cpu_count, std::unordered_set<std::vector<KeyT>>());
1183  std::vector<std::mutex> mutex_set(cpu_count);
1184  for (size_t start_entry = 0, i = 0; start_entry < entry_cnt_;
1185  start_entry += stride, ++i) {
1186  const auto end_entry = std::min(entry_cnt_, start_entry + stride);
1187  child_threads.push_back(std::async(std::launch::async,
1188  &Deduplicater::runDispatch,
1189  this,
1190  std::ref(mask_set),
1191  std::ref(mutex_set),
1192  start_entry,
1193  end_entry));
1194  }
1195 
1196  for (auto& child : child_threads) {
1197  child.get();
1198  }
1199 
1200  size_t row_count = 0;
1201  for (auto& mask : mask_set) {
1202  row_count += mask.size();
1203  }
1204  CHECK_GE(entry_cnt_, row_count);
1205  return row_count;
1206  }
1207 
1208  private:
1209  int8_t* buff_;
1210  const size_t entry_sz_;
1211  const size_t entry_cnt_;
1212  const size_t key_cnt_;
1213 
1214  void runDispatch(std::vector<std::unordered_set<std::vector<KeyT>>>& mask_set,
1215  std::vector<std::mutex>& mutex_set,
1216  const size_t start_entry,
1217  const size_t end_entry) {
1218  CHECK_EQ(mask_set.size(), mutex_set.size());
1219  const size_t set_size = mask_set.size();
1220  for (size_t i = start_entry; i < end_entry; ++i) {
1221  std::vector<KeyT> keys(key_cnt_);
1222  auto key_buffers = reinterpret_cast<KeyT*>(buff_);
1223  if (isColumnar) {
1224  for (size_t k = 0; k < key_cnt_; ++k) {
1225  keys[k] = key_buffers[i + k * entry_cnt_];
1226  }
1227  } else {
1228  for (size_t k = 0; k < key_cnt_; ++k) {
1229  keys[k] = reinterpret_cast<const KeyT*>(buff_ + i * entry_sz_)[k];
1230  }
1231  }
1232  CHECK_EQ(keys.size(), key_cnt_);
1233  const size_t mask_idx = std::hash<decltype(keys)>()(keys) % set_size;
1234  const bool inserted = [&]() {
1235  std::lock_guard<std::mutex> mask_lock(mutex_set[mask_idx]);
1236  auto it_ok = mask_set[mask_idx].insert(keys);
1237  return it_ok.second;
1238  }();
1239  if (!inserted) {
1240  if (isColumnar) {
1241  for (size_t k = 0; k < key_cnt_; ++k) {
1242  reset_entry(key_buffers + i + k * entry_cnt_);
1243  }
1244  } else {
1245  for (size_t k = 0; k < key_cnt_; ++k) {
1246  reset_entry(reinterpret_cast<KeyT*>(buff_ + i * entry_sz_) + k);
1247  }
1248  }
1249  }
1250  }
1251  }
1252 };
1253 
1254 } // namespace
1255 
1256 TEST(Reduction, Baseline) {
1257  // Config
1258  std::vector<OP_KIND> agg_ops{OP_SUM, OP_MAX};
1259  const size_t key_count = 2;
1260  const size_t val_count = 2;
1261  const size_t entry_count = 20000000;
1262  const bool is_columnar = false;
1263  const size_t result_count = std::max(size_t(2), get_gpu_count());
1264 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1265  const float fill_rate = 0.5f;
1266 #endif
1267 
1268  const size_t col_count = key_count + val_count;
1269  const std::vector<size_t> col_widths(col_count, sizeof(int64_t));
1270  std::vector<size_t> init_vals(key_count, EMPTY_KEY_64);
1271  for (size_t i = 0; i < val_count; ++i) {
1272  init_vals.push_back(get_default_value(agg_ops[i]));
1273  }
1274  std::vector<TargetInfo> target_infos;
1275  const SQLTypeInfo bigint_ti(kBIGINT, true);
1276  switch (val_count) {
1277  case 3:
1278  target_infos.push_back(TargetInfo{true, kMIN, bigint_ti, bigint_ti, true, false});
1279  case 2:
1280  target_infos.push_back(TargetInfo{true, kMAX, bigint_ti, bigint_ti, true, false});
1281  case 1:
1282  target_infos.push_back(TargetInfo{true, kSUM, bigint_ti, bigint_ti, true, false});
1283  break;
1284  default:
1285  CHECK(false);
1286  }
1287  std::reverse(target_infos.begin(), target_infos.end());
1288 
1289  const auto device_type = ExecutorDeviceType::CPU;
1290  CHECK_GT(key_count, 1u);
1291  size_t row_size = key_count * sizeof(int64_t);
1292  std::vector<int8_t> group_col_widths(key_count, sizeof(int64_t));
1293  QueryMemoryDescriptor query_mem_desc(
1294  QueryDescriptionType::GroupByBaselineHash, 0, 0, false, group_col_widths);
1295  query_mem_desc.setHasKeylessHash(false);
1296  query_mem_desc.setOutputColumnar(is_columnar);
1297  query_mem_desc.setEntryCount(entry_count);
1298  for (const auto& target_info : target_infos) {
1299  const auto slot_bytes =
1300  std::max(int8_t(8), static_cast<int8_t>(target_info.sql_type.get_size()));
1301  query_mem_desc.addColSlotInfo({std::make_tuple(slot_bytes, slot_bytes)});
1302  row_size += slot_bytes;
1303  }
1304 
1305 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1306  const bool has_multi_gpus = get_gpu_count() > 1;
1307  const auto input_size = query_mem_desc.getBufferSizeBytes(device_type);
1308 #else
1309  const bool has_multi_gpus = false;
1310 #endif // HAVE_CUDA
1311  const auto row_set_mem_owner = std::make_shared<RowSetMemoryOwner>();
1312  std::vector<std::unique_ptr<ResultSet>> results;
1313  for (size_t i = 0; i < result_count; ++i) {
1314  auto rs = boost::make_unique<ResultSet>(
1315  target_infos, device_type, query_mem_desc, row_set_mem_owner, nullptr);
1316  rs->allocateStorage();
1317  results.push_back(std::move(rs));
1318  }
1319 
1320  std::vector<std::pair<int64_t, int64_t>> ranges;
1321  for (size_t k = 0; k < key_count; ++k) {
1322  ranges.emplace_back(-(entry_count / 2), (entry_count / 2));
1323  }
1324 
1325  for (size_t v = 0; v < val_count; ++v) {
1326  ranges.push_back(get_default_range(agg_ops[v]));
1327  }
1328  std::vector<DIST_KIND> distributions(col_count, DIST_KIND::UNI);
1329 
1330  std::cout << "ResultSet Count: " << results.size() << std::endl;
1331  std::vector<size_t> rs_row_counts(results.size(), entry_count);
1332  // Generate random data.
1333  auto gen_func = [&](int8_t* input_buffer, const size_t device_id) -> size_t {
1334  auto actual_row_count = entry_count;
1335 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1336  if (has_multi_gpus) {
1337  cudaSetDevice(device_id);
1338  }
1339  int8_t* dev_input_buffer = nullptr;
1340  cudaMalloc(&dev_input_buffer, input_size);
1341  if (generate_columns_on_device(dev_input_buffer,
1342  entry_count,
1343  col_count,
1344  col_widths,
1345  ranges,
1346  is_columnar,
1347  distributions)) {
1348  actual_row_count = deduplicate_rows_on_device(
1349  dev_input_buffer, entry_count, key_count, col_widths, is_columnar);
1350  auto dev_input_copy = get_hashed_copy(dev_input_buffer,
1351  entry_count,
1352  entry_count,
1353  col_widths,
1354  agg_ops,
1355  init_vals,
1356  is_columnar);
1357  cudaFree(dev_input_buffer);
1358  actual_row_count = drop_rows(dev_input_copy,
1359  entry_count,
1360  row_size,
1361  actual_row_count,
1362  fill_rate,
1363  is_columnar);
1364  cudaMemcpy(input_buffer, dev_input_copy, input_size, cudaMemcpyDeviceToHost);
1365  } else
1366 #endif
1367  {
1368  generate_columns_on_host(input_buffer,
1369  entry_count,
1370  col_count,
1371  col_widths,
1372  ranges,
1373  is_columnar,
1374  distributions);
1375  actual_row_count =
1376  Deduplicater<false>(input_buffer, row_size, entry_count, key_count).run();
1377  }
1378 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1379  if (dev_input_buffer) {
1380  cudaFree(dev_input_buffer);
1381  }
1382 #endif
1383  return actual_row_count;
1384  };
1385  if (has_multi_gpus) {
1386  std::vector<std::future<size_t>> gener_threads;
1387  for (size_t i = 0; i < results.size(); ++i) {
1388  gener_threads.push_back(std::async(std::launch::async,
1389  gen_func,
1390  results[i]->getStorage()->getUnderlyingBuffer(),
1391  i));
1392  }
1393 
1394  for (size_t i = 0; i < gener_threads.size(); ++i) {
1395  rs_row_counts[i] = gener_threads[i].get();
1396  }
1397  } else {
1398  for (size_t i = 0; i < results.size(); ++i) {
1399  rs_row_counts[i] = gen_func(results[i]->getStorage()->getUnderlyingBuffer(), i);
1400  }
1401  }
1402 
1403  for (size_t i = 0; i < rs_row_counts.size(); ++i) {
1404  std::cout << "ResultSet " << i << " has " << rs_row_counts[i] << " rows and "
1405  << entry_count - rs_row_counts[i] << " empty buckets\n";
1406  }
1407  AggregateEmulator<int64_t, int64_t> emulator(agg_ops);
1408  std::vector<decltype(emulator)::ResultType> ref_results;
1409  for (auto& rs : results) {
1410  auto ref_rs = emulator.run(rs->getStorage()->getUnderlyingBuffer(),
1411  key_count,
1412  val_count,
1413  entry_count,
1414  is_columnar);
1415  ref_results.push_back(std::move(ref_rs));
1416  }
1417  auto ref_reduced_result = emulator.reduce(ref_results);
1418  ResultSetManager rs_manager;
1419  std::vector<ResultSet*> storage_set;
1420  for (auto& rs : results) {
1421  storage_set.push_back(rs.get());
1422  }
1423 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1424  CHECK_GT(results.size(), 0);
1425  std::vector<int64_t> gpu_reduced_result(input_size / sizeof(int64_t), 0);
1426  memcpy(&gpu_reduced_result[0],
1427  results[0]->getStorage()->getUnderlyingBuffer(),
1428  input_size);
1429 #endif
1430  ResultSet* reduced_result = nullptr;
1431  std::cout << "CPU reduction: ";
1432  auto elapsedTime = measure<>::execution([&]() {
1433  // Do calculation on host
1434  reduced_result = rs_manager.reduce(storage_set);
1435  });
1436  CHECK(reduced_result != nullptr);
1437  std::cout << "Current reduction took " << elapsedTime << " ms and got reduced "
1438  << reduced_result->rowCount() << " rows\n";
1439 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1440  std::vector<int8_t*> host_reduced_buffers(result_count, nullptr);
1441  host_reduced_buffers[0] = reinterpret_cast<int8_t*>(&gpu_reduced_result[0]);
1442  for (size_t i = 1; i < storage_set.size(); ++i) {
1443  host_reduced_buffers[i] = storage_set[i]->getStorage()->getUnderlyingBuffer();
1444  }
1445  std::vector<int8_t*> dev_reduced_buffers(result_count, nullptr);
1446  std::vector<size_t> rs_entry_count(result_count, entry_count);
1447  std::cout << "GPU reduction: ";
1448  elapsedTime = measure<>::execution([&]() {
1449  for (size_t device_id = 0; device_id < result_count; ++device_id) {
1450  if (has_multi_gpus) {
1451  cudaSetDevice(device_id);
1452  }
1453  int8_t* dev_reduced_buffer = nullptr;
1454  cudaMalloc(&dev_reduced_buffer, input_size);
1455  cudaMemcpy(dev_reduced_buffer,
1456  host_reduced_buffers[device_id],
1457  input_size,
1458  cudaMemcpyHostToDevice);
1459  dev_reduced_buffers[device_id] = dev_reduced_buffer;
1460  }
1461  for (size_t stride = 1, end = (result_count + 1) / 2; stride <= end; stride <<= 1) {
1462  std::vector<std::future<void>> reducer_threads;
1463  for (size_t device_id = 0; device_id + stride < result_count;
1464  device_id += stride * 2) {
1465  reducer_threads.push_back(std::async(
1466  std::launch::async,
1467  [&](const size_t dev_id) {
1468  if (has_multi_gpus) {
1469  cudaSetDevice(dev_id);
1470  }
1471  reduce_on_device(dev_reduced_buffers[dev_id],
1472  dev_id,
1473  rs_entry_count[dev_id],
1474  dev_reduced_buffers[dev_id + stride],
1475  has_multi_gpus ? dev_id + stride : dev_id,
1476  rs_entry_count[dev_id + stride],
1477  rs_row_counts[dev_id + stride],
1478  col_widths,
1479  agg_ops,
1480  init_vals,
1481  is_columnar);
1482  },
1483  device_id));
1484  }
1485  for (auto& child : reducer_threads) {
1486  child.get();
1487  }
1488  }
1489  });
1490  std::cout << "Current reduction took " << elapsedTime << " ms\n";
1491  {
1492  std::vector<int64_t> temp_buffer(rs_entry_count[0] * col_count, 0);
1493  cudaMemcpy(&temp_buffer[0],
1494  dev_reduced_buffers[0],
1495  temp_buffer.size() * sizeof(int64_t),
1496  cudaMemcpyDeviceToHost);
1497  for (size_t i = 0; i < dev_reduced_buffers.size(); ++i) {
1498  if (has_multi_gpus) {
1499  cudaSetDevice(i);
1500  }
1501  cudaFree(dev_reduced_buffers[i]);
1502  dev_reduced_buffers[i] = nullptr;
1503  }
1504  gpu_reduced_result.swap(temp_buffer);
1505  }
1506 #endif
1507  ASSERT_TRUE(emulator.compare(reduced_result->getStorage()->getUnderlyingBuffer(),
1508  key_count,
1509  val_count,
1510  reduced_result->getQueryMemDesc().getEntryCount(),
1511  is_columnar,
1512  ref_reduced_result));
1513 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1514  ASSERT_TRUE(emulator.compare(reinterpret_cast<int8_t*>(&gpu_reduced_result[0]),
1515  key_count,
1516  val_count,
1517  gpu_reduced_result.size() / col_count,
1518  is_columnar,
1519  ref_reduced_result));
1520 
1521 #endif
1522 }
1523 
1524 TEST(Reduction, PerfectHash) {
1525  // Config
1526  std::vector<OP_KIND> agg_ops{OP_SUM, OP_MAX};
1527  const size_t key_count = 1;
1528  const size_t val_count = 2;
1529  const size_t entry_count = 2000000;
1530  const bool is_columnar = false;
1531  const size_t result_count = std::max(size_t(2), get_gpu_count());
1532 
1533  const size_t col_count = key_count + val_count;
1534  const std::vector<size_t> col_widths(col_count, sizeof(int64_t));
1535  std::vector<size_t> init_vals(key_count, EMPTY_KEY_64);
1536  for (size_t i = 0; i < val_count; ++i) {
1537  init_vals.push_back(get_default_value(agg_ops[i]));
1538  }
1539  std::vector<TargetInfo> target_infos;
1540  const SQLTypeInfo bigint_ti(kBIGINT, true);
1541  switch (val_count) {
1542  case 3:
1543  target_infos.push_back(TargetInfo{true, kMIN, bigint_ti, bigint_ti, true, false});
1544  case 2:
1545  target_infos.push_back(TargetInfo{true, kMAX, bigint_ti, bigint_ti, true, false});
1546  case 1:
1547  target_infos.push_back(TargetInfo{true, kSUM, bigint_ti, bigint_ti, true, false});
1548  break;
1549  default:
1550  CHECK(false);
1551  }
1552  std::reverse(target_infos.begin(), target_infos.end());
1553 
1554  const auto device_type = ExecutorDeviceType::CPU;
1555  size_t row_size = key_count * sizeof(int64_t);
1556  std::vector<int8_t> group_col_widths(key_count, sizeof(int64_t));
1557  const auto hash_type = QueryDescriptionType::GroupByPerfectHash;
1558  QueryMemoryDescriptor query_mem_desc(hash_type, 0, 0, false, group_col_widths);
1559  query_mem_desc.setHasKeylessHash(false);
1560  query_mem_desc.setOutputColumnar(is_columnar);
1561  query_mem_desc.setEntryCount(entry_count);
1562 
1563  for (const auto& target_info : target_infos) {
1564  const auto slot_bytes =
1565  std::max(int8_t(8), static_cast<int8_t>(target_info.sql_type.get_size()));
1566  query_mem_desc.addColSlotInfo({std::make_tuple(slot_bytes, slot_bytes)});
1567  row_size += slot_bytes;
1568  }
1569 
1570 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1571  const bool has_multi_gpus = get_gpu_count() > 1;
1572  const auto input_size = query_mem_desc.getBufferSizeBytes(device_type);
1573 #else
1574  const bool has_multi_gpus = false;
1575 #endif // HAVE_CUDA
1576  const auto row_set_mem_owner = std::make_shared<RowSetMemoryOwner>();
1577  std::vector<std::unique_ptr<ResultSet>> results;
1578  for (size_t i = 0; i < result_count; ++i) {
1579  auto rs = boost::make_unique<ResultSet>(
1580  target_infos, device_type, query_mem_desc, row_set_mem_owner, nullptr);
1581  rs->allocateStorage();
1582  results.push_back(std::move(rs));
1583  }
1584 
1585  std::vector<std::pair<int64_t, int64_t>> ranges(
1586  key_count,
1587  {0, (static_cast<int64_t>(std::exp((std::log(entry_count) / key_count))) - 1)});
1588 
1589  for (size_t v = 0; v < val_count; ++v) {
1590  ranges.push_back(get_default_range(agg_ops[v]));
1591  }
1592  std::vector<DIST_KIND> distributions(col_count, DIST_KIND::UNI);
1593 
1594  std::cout << "ResultSet Count: " << results.size() << std::endl;
1595  std::vector<size_t> rs_row_counts(results.size(), entry_count);
1596  // Generate random data.
1597  auto gen_func = [&](int8_t* input_buffer, const size_t device_id) -> size_t {
1598  auto actual_row_count = entry_count;
1599 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1600  if (has_multi_gpus) {
1601  cudaSetDevice(device_id);
1602  }
1603  int8_t* dev_input_buffer = nullptr;
1604  cudaMalloc(&dev_input_buffer, input_size);
1605  if (generate_columns_on_device(dev_input_buffer,
1606  entry_count,
1607  col_count,
1608  col_widths,
1609  ranges,
1610  is_columnar,
1611  distributions)) {
1612  int8_t* dev_input_copy = nullptr;
1613  std::tie(dev_input_copy, actual_row_count) =
1614  get_perfect_hashed_copy(dev_input_buffer,
1615  entry_count,
1616  col_widths,
1617  ranges,
1618  agg_ops,
1619  init_vals,
1620  is_columnar);
1621  cudaFree(dev_input_buffer);
1622  cudaMemcpy(input_buffer, dev_input_copy, input_size, cudaMemcpyDeviceToHost);
1623  } else
1624 #endif
1625  {
1626  generate_columns_on_host(input_buffer,
1627  entry_count,
1628  col_count,
1629  col_widths,
1630  ranges,
1631  is_columnar,
1632  distributions);
1633  actual_row_count =
1634  Deduplicater<false>(input_buffer, row_size, entry_count, key_count).run();
1635  }
1636 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1637  if (dev_input_buffer) {
1638  cudaFree(dev_input_buffer);
1639  }
1640 #endif
1641  return actual_row_count;
1642  };
1643 
1644  if (has_multi_gpus) {
1645  std::vector<std::future<size_t>> gener_threads;
1646  for (size_t i = 0; i < results.size(); ++i) {
1647  gener_threads.push_back(std::async(std::launch::async,
1648  gen_func,
1649  results[i]->getStorage()->getUnderlyingBuffer(),
1650  i));
1651  }
1652 
1653  for (size_t i = 0; i < gener_threads.size(); ++i) {
1654  rs_row_counts[i] = gener_threads[i].get();
1655  }
1656  } else {
1657  for (size_t i = 0; i < results.size(); ++i) {
1658  rs_row_counts[i] = gen_func(results[i]->getStorage()->getUnderlyingBuffer(), i);
1659  }
1660  }
1661 
1662  for (size_t i = 0; i < rs_row_counts.size(); ++i) {
1663  std::cout << "ResultSet " << i << " has " << rs_row_counts[i] << " rows and "
1664  << entry_count - rs_row_counts[i] << " empty buckets\n";
1665  }
1666  AggregateEmulator<int64_t, int64_t> emulator(agg_ops);
1667  std::vector<decltype(emulator)::ResultType> ref_results;
1668  for (auto& rs : results) {
1669  auto ref_rs = emulator.run(rs->getStorage()->getUnderlyingBuffer(),
1670  key_count,
1671  val_count,
1672  entry_count,
1673  is_columnar);
1674  ref_results.push_back(std::move(ref_rs));
1675  }
1676  auto ref_reduced_result = emulator.reduce(ref_results);
1677  ResultSetManager rs_manager;
1678  std::vector<ResultSet*> storage_set;
1679  for (auto& rs : results) {
1680  storage_set.push_back(rs.get());
1681  }
1682 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1683  CHECK_GT(results.size(), 0);
1684  std::vector<int64_t> gpu_reduced_result(input_size / sizeof(int64_t), 0);
1685  memcpy(&gpu_reduced_result[0],
1686  results[0]->getStorage()->getUnderlyingBuffer(),
1687  input_size);
1688 #endif
1689  ResultSet* reduced_result = nullptr;
1690  std::cout << "CPU reduction: ";
1691  auto elapsedTime = measure<>::execution([&]() {
1692  // Do calculation on host
1693  reduced_result = rs_manager.reduce(storage_set);
1694  });
1695  CHECK(reduced_result != nullptr);
1696  std::cout << "Current reduction took " << elapsedTime << " ms and got reduced "
1697  << reduced_result->rowCount() << " rows\n";
1698 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1699  std::vector<int8_t*> host_reduced_buffers(result_count, nullptr);
1700  host_reduced_buffers[0] = reinterpret_cast<int8_t*>(&gpu_reduced_result[0]);
1701  for (size_t i = 1; i < storage_set.size(); ++i) {
1702  host_reduced_buffers[i] = storage_set[i]->getStorage()->getUnderlyingBuffer();
1703  }
1704  std::vector<int8_t*> dev_reduced_buffers(result_count, nullptr);
1705  std::vector<int8_t*> dev_seg_copies(result_count, nullptr);
1706  const auto seg_count = has_multi_gpus ? result_count : size_t(1);
1707  const auto stride = (entry_count + (seg_count - 1)) / seg_count;
1708 
1709  std::cout << "GPU reduction: ";
1710  elapsedTime = measure<>::execution([&]() {
1711  std::vector<std::future<void>> uploader_threads;
1712  for (size_t device_id = 0; device_id < result_count; ++device_id) {
1713  uploader_threads.push_back(std::async(
1714  std::launch::async,
1715  [&](const size_t dev_id) {
1716  if (has_multi_gpus) {
1717  cudaSetDevice(dev_id);
1718  }
1719  int8_t* dev_reduced_buffer = nullptr;
1720  cudaMalloc(&dev_reduced_buffer, input_size);
1721  cudaMemcpy(dev_reduced_buffer,
1722  host_reduced_buffers[dev_id],
1723  input_size,
1724  cudaMemcpyHostToDevice);
1725  dev_reduced_buffers[dev_id] = dev_reduced_buffer;
1726  },
1727  device_id));
1728  }
1729  for (auto& child : uploader_threads) {
1730  child.get();
1731  }
1732  });
1733  std::cout << "Current reduction took " << elapsedTime << " ms to upload to VRAM and ";
1734 
1735  elapsedTime = measure<>::execution([&]() {
1736  // Redistribute across devices
1737  if (has_multi_gpus) {
1738  std::vector<std::future<void>> redis_threads;
1739  for (size_t device_id = 0, start_entry = 0; device_id < result_count;
1740  ++device_id, start_entry += stride) {
1741  const auto end_entry = std::min(start_entry + stride, entry_count);
1742  redis_threads.push_back(std::async(
1743  std::launch::async,
1744  [&](const size_t dev_id, const size_t start, const size_t end) {
1745  cudaSetDevice(dev_id);
1746  dev_seg_copies[dev_id] = fetch_segs_from_others(dev_reduced_buffers,
1747  entry_count,
1748  dev_id,
1749  result_count,
1750  col_widths,
1751  is_columnar,
1752  start,
1753  end);
1754  },
1755  device_id,
1756  start_entry,
1757  end_entry));
1758  }
1759  for (auto& child : redis_threads) {
1760  child.get();
1761  }
1762  } else {
1763  CHECK_EQ(dev_reduced_buffers.size(), size_t(2));
1764  dev_seg_copies[0] = dev_reduced_buffers[1];
1765  }
1766  // Reduce
1767  std::vector<std::future<void>> reducer_threads;
1768  for (size_t device_id = 0, start_entry = 0; device_id < seg_count;
1769  ++device_id, start_entry += stride) {
1770  const auto end_entry = std::min(start_entry + stride, entry_count);
1771  reducer_threads.push_back(std::async(
1772  std::launch::async,
1773  [&](const size_t dev_id, const size_t start, const size_t end) {
1774  if (has_multi_gpus) {
1775  cudaSetDevice(dev_id);
1776  }
1777  reduce_segment_on_device(dev_reduced_buffers[dev_id],
1778  dev_seg_copies[dev_id],
1779  entry_count,
1780  seg_count,
1781  col_widths,
1782  agg_ops,
1783  is_columnar,
1784  start,
1785  end);
1786  },
1787  device_id,
1788  start_entry,
1789  end_entry));
1790  }
1791  for (auto& child : reducer_threads) {
1792  child.get();
1793  }
1794  });
1795  std::cout << elapsedTime << " ms to reduce.\n";
1796  {
1797  for (size_t device_id = 0, start = 0; device_id < seg_count;
1798  ++device_id, start += stride) {
1799  const auto end = std::min(start + stride, entry_count);
1800  if (has_multi_gpus) {
1801  cudaSetDevice(device_id);
1802  cudaFree(dev_seg_copies[device_id]);
1803  dev_seg_copies[device_id] = nullptr;
1804  }
1805  if (is_columnar) {
1806  for (size_t c = 0, col_base = start; c < col_count;
1807  ++c, col_base += entry_count) {
1808  cudaMemcpy(&gpu_reduced_result[col_base],
1809  dev_reduced_buffers[device_id] + col_base * sizeof(int64_t),
1810  (end - start) * sizeof(int64_t),
1811  cudaMemcpyDeviceToHost);
1812  }
1813  } else {
1814  cudaMemcpy(&gpu_reduced_result[start * col_count],
1815  dev_reduced_buffers[device_id] + start * row_size,
1816  (end - start) * row_size,
1817  cudaMemcpyDeviceToHost);
1818  }
1819  cudaFree(dev_reduced_buffers[device_id]);
1820  dev_reduced_buffers[device_id] = nullptr;
1821  }
1822  }
1823 #endif
1824  ASSERT_TRUE(emulator.compare(reduced_result->getStorage()->getUnderlyingBuffer(),
1825  key_count,
1826  val_count,
1827  reduced_result->getQueryMemDesc().getEntryCount(),
1828  is_columnar,
1829  ref_reduced_result));
1830 #if defined(HAVE_CUDA) && CUDA_VERSION >= 8000
1831  ASSERT_TRUE(emulator.compare(reinterpret_cast<int8_t*>(&gpu_reduced_result[0]),
1832  key_count,
1833  val_count,
1834  gpu_reduced_result.size() / col_count,
1835  is_columnar,
1836  ref_reduced_result));
1837 
1838 #endif
1839 }
1840 
1841 int main(int argc, char** argv) {
1843  testing::InitGoogleTest(&argc, argv);
1845 #ifndef HAVE_CUDA
1846  testing::GTEST_FLAG(filter) = "-Hash.Baseline";
1847 #endif
1848 
1849  int err{0};
1850  try {
1851  err = RUN_ALL_TESTS();
1852  } catch (const std::exception& e) {
1853  LOG(ERROR) << e.what();
1854  }
1855  return err;
1856 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
const float c_space_usage
Definition: ProfileTest.cpp:45
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
#define EMPTY_KEY_64
Deduplicater(int8_t *row_buff, const size_t row_size, const size_t row_count, const size_t key_count)
void setEntryCount(const size_t val)
size_t compareDispatch(const int8_t *buffers, const size_t key_count, const size_t val_count, const size_t group_count, const size_t start_group, const size_t end_group, const bool is_columnar, const ResultType &ref_result)
#define LOG(tag)
Definition: Logger.h:182
int main(int argc, char **argv)
void setHasKeylessHash(const bool val)
void c(const std::string &query_string, const ExecutorDeviceType device_type)
void setOutputColumnar(const bool val)
#define CHECK_GE(x, y)
Definition: Logger.h:200
bool generate_numbers(int8_t *random_numbers, const unsigned num_random_numbers, const T min_number, const T max_number, const DIST_KIND dist, const size_t stride=sizeof(T))
Definition: ProfileTest.cpp:82
void init_groups_on_host(int8_t *groups, const size_t group_count, const size_t col_count, const std::vector< size_t > &col_widths, const std::vector< size_t > &init_vals, const bool is_columnar)
ResultType run(const int8_t *buffers, const size_t key_count, const size_t val_count, const size_t row_count, const bool is_columnar)
ResultType reduce(const std::vector< ResultType > &partial_results)
TEST(Hash, Baseline)
#define CHECK_GT(x, y)
Definition: Logger.h:199
Definition: sqldefs.h:71
OP_KIND
Definition: ProfileTest.h:64
T v(const TargetValue &r)
size_t getBufferSizeBytes(const RelAlgExecutionUnit &ra_exe_unit, const unsigned thread_count, const ExecutorDeviceType device_type) const
Definition: sqldefs.h:71
bool g_gpus_present
Definition: ProfileTest.cpp:43
void hash_combine(std::size_t &seed, T const &v)
#define CHECK_LT(x, y)
Definition: Logger.h:197
#define CHECK_LE(x, y)
Definition: Logger.h:198
DIST_KIND
Definition: ProfileTest.h:62
bool compare(const int8_t *buffers, const size_t key_count, const size_t val_count, const size_t group_count, const bool is_columnar, const ResultType &ref_result)
void runDispatch(std::vector< std::unordered_set< std::vector< KeyT >>> &mask_set, std::vector< std::mutex > &mutex_set, const size_t start_entry, const size_t end_entry)
int CUresult
Definition: nocuda.h:21
std::pair< ValT, ValT > get_default_range(OP_KIND op)
size_t operator()(const vector< T > &vint) const
DIST_KIND get_default_dist(OP_KIND op)
#define CHECK(condition)
Definition: Logger.h:187
void init_logger_stderr_only(int argc, char const *const *argv)
Definition: TestHelpers.h:194
std::unordered_map< std::vector< KeyT >, std::vector< ValT > > ResultType
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
static bool run
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
const int64_t * init_vals
Definition: sqldefs.h:71
int cpu_threads()
Definition: thread_count.h:23
Unit tests for microbenchmark.
ResultSet * reduce(std::vector< ResultSet *> &)
AggregateEmulator(const std::vector< OP_KIND > &ops)
void runDispatch(ResultType &partial_res, const int8_t *buffers, const size_t key_count, const size_t val_count, const size_t row_count, const size_t start_row, const size_t end_row, const bool is_columnar)
bool generate_columns_on_host(int8_t *buffers, const size_t row_count, const size_t col_count, const std::vector< size_t > &col_widths, const std::vector< std::pair< int64_t, int64_t >> &ranges, const bool is_columnar, const std::vector< DIST_KIND > &dists)