OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopKSort.cu
Go to the documentation of this file.
1 /* copyright 2017 MapD Technologies, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /*
17  * @file TopKSort.cu
18  * @author Minggang Yu <miyu@mapd.com>
19  * @brief Top-k sorting on streaming top-k heaps on VRAM
20  *
21  * Copyright (c) 2017 MapD Technologies, Inc. All rights reserved.
22  */
23 #include "BufferEntryUtils.h"
24 #include "GpuMemUtils.h"
26 #include "SortUtils.cuh"
27 #include "StreamingTopN.h"
28 #include "TopKSort.h"
29 
30 #include <thrust/copy.h>
31 #include <thrust/execution_policy.h>
32 #include <thrust/functional.h>
33 #include <thrust/partition.h>
34 #include <thrust/sort.h>
35 
36 #include <cuda.h>
38 
39 #define checkCudaErrors(err) CHECK_EQ(err, CUDA_SUCCESS)
40 
41 #include <iostream>
42 
43 template <class K, class I = int32_t>
45  is_taken_entry(const int8_t* buff, const size_t stride)
46  : buff_ptr(buff), key_stride(stride) {}
47  __host__ __device__ bool operator()(const I index) {
48  return !is_empty_entry<K>(static_cast<size_t>(index), buff_ptr, key_stride);
49  }
50  const int8_t* buff_ptr;
51  const size_t key_stride;
52 };
53 
54 template <class K, class I = int32_t>
56  typedef I argument_type;
57  is_null_order_entry(const int8_t* base, const size_t stride, const int64_t nul)
58  : oe_base(base), oe_stride(stride), null_val(nul) {}
59  __host__ __device__ bool operator()(const I index) {
60  const auto oe_val = *reinterpret_cast<const K*>(oe_base + index * oe_stride);
61  switch (sizeof(K)) {
62  case 4:
63  return *reinterpret_cast<const int32_t*>(&oe_val) ==
64  static_cast<int32_t>(null_val);
65  case 8:
66  return *reinterpret_cast<const int64_t*>(&oe_val) == null_val;
67  default:
68  return false;
69  }
70  }
71  const int8_t* oe_base;
72  const size_t oe_stride;
73  const int64_t null_val;
74 };
75 
76 template <typename ForwardIterator>
77 ForwardIterator partition_by_null(ForwardIterator first,
78  ForwardIterator last,
79  const int64_t null_val,
80  const bool nulls_first,
81  const int8_t* rows_ptr,
82  const GroupByBufferLayoutInfo& layout) {
83  if (nulls_first) {
84  return (layout.col_bytes == 4)
85  ? thrust::partition(
86  first,
87  last,
89  rows_ptr + layout.col_off, layout.row_bytes, null_val))
90  : thrust::partition(
91  first,
92  last,
94  rows_ptr + layout.col_off, layout.row_bytes, null_val));
95  } else {
96  return (layout.col_bytes == 4)
97  ? thrust::partition(
98  first,
99  last,
100  thrust::not1(is_null_order_entry<int32_t>(
101  rows_ptr + layout.col_off, layout.row_bytes, null_val)))
102  : thrust::partition(
103  first,
104  last,
105  thrust::not1(is_null_order_entry<int64_t>(
106  rows_ptr + layout.col_off, layout.row_bytes, null_val)));
107  }
108 }
109 
110 template <class K, class I>
111 struct KeyFetcher {
112  KeyFetcher(K* out_base,
113  const int8_t* src_oe_base,
114  const size_t stride,
115  const I* indices)
116  : key_base(out_base), oe_base(src_oe_base), oe_stride(stride), idx_base(indices) {}
117  __host__ __device__ void operator()(const I index) {
118  key_base[index] = *reinterpret_cast<const K*>(oe_base + idx_base[index] * oe_stride);
119  }
120 
122  const int8_t* oe_base;
123  const size_t oe_stride;
124  const I* idx_base;
125 };
126 
127 template <class K>
128 struct KeyReseter {
129  KeyReseter(int8_t* out_base, const size_t stride, const K emp_key)
130  : rows_base(out_base), key_stride(stride), empty_key(emp_key) {}
131  __host__ __device__ void operator()(const size_t index) {
132  K* key_ptr = reinterpret_cast<K*>(rows_base + index * key_stride);
133  *key_ptr = empty_key;
134  }
135 
136  int8_t* rows_base;
137  const size_t key_stride;
138  const K empty_key;
139 };
140 
141 // TODO(miyu) : switch to shared version in ResultSetSortImpl.cu.
142 template <class K, class I>
143 void collect_order_entry_column(thrust::device_ptr<K>& d_oe_col_buffer,
144  const int8_t* d_src_buffer,
145  const thrust::device_ptr<I>& d_idx_first,
146  const size_t idx_count,
147  const size_t oe_offset,
148  const size_t oe_stride,
149  ThrustAllocator& allocator,
150  const int device_id) {
151  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
152  thrust::for_each(thrust::cuda::par(allocator).on(qe_cuda_stream),
153  thrust::make_counting_iterator(size_t(0)),
154  thrust::make_counting_iterator(idx_count),
155  KeyFetcher<K, I>(thrust::raw_pointer_cast(d_oe_col_buffer),
156  d_src_buffer + oe_offset,
157  oe_stride,
158  thrust::raw_pointer_cast(d_idx_first)));
159  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
160 }
161 
162 template <class K, class I>
163 void sort_indices_by_key(thrust::device_ptr<I> d_idx_first,
164  const size_t idx_count,
165  const thrust::device_ptr<K>& d_key_buffer,
166  const bool desc,
167  ThrustAllocator& allocator,
168  const int device_id) {
169  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
170  if (desc) {
171  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
172  d_key_buffer,
173  d_key_buffer + idx_count,
174  d_idx_first,
175  thrust::greater<K>());
176  } else {
177  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
178  d_key_buffer,
179  d_key_buffer + idx_count,
180  d_idx_first);
181  }
182  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
183 }
184 
185 template <class I = int32_t>
186 void do_radix_sort(thrust::device_ptr<I> d_idx_first,
187  const size_t idx_count,
188  const int8_t* d_src_buffer,
189  const PodOrderEntry& oe,
190  const GroupByBufferLayoutInfo& layout,
191  ThrustAllocator& allocator,
192  const int device_id) {
193  const auto& oe_type = layout.oe_target_info.sql_type;
194  if (oe_type.is_fp()) {
195  switch (layout.col_bytes) {
196  case 4: {
197  auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
198  collect_order_entry_column(d_oe_buffer,
199  d_src_buffer,
200  d_idx_first,
201  idx_count,
202  layout.col_off,
203  layout.row_bytes,
204  allocator,
205  device_id);
207  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
208  break;
209  }
210  case 8: {
211  auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
212  collect_order_entry_column(d_oe_buffer,
213  d_src_buffer,
214  d_idx_first,
215  idx_count,
216  layout.col_off,
217  layout.row_bytes,
218  allocator,
219  device_id);
221  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
222  break;
223  }
224  default:
225  CHECK(false);
226  }
227  return;
228  }
229  CHECK(oe_type.is_number() || oe_type.is_time());
230  switch (layout.col_bytes) {
231  case 4: {
232  auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
233  collect_order_entry_column(d_oe_buffer,
234  d_src_buffer,
235  d_idx_first,
236  idx_count,
237  layout.col_off,
238  layout.row_bytes,
239  allocator,
240  device_id);
242  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
243  break;
244  }
245  case 8: {
246  auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
247  collect_order_entry_column(d_oe_buffer,
248  d_src_buffer,
249  d_idx_first,
250  idx_count,
251  layout.col_off,
252  layout.row_bytes,
253  allocator,
254  device_id);
256  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
257  break;
258  }
259  default:
260  CHECK(false);
261  }
262 }
263 
264 template <class I>
265 struct RowFetcher {
266  RowFetcher(int8_t* out_base,
267  const int8_t* in_base,
268  const I* indices,
269  const size_t row_sz)
270  : dst_base(out_base), src_base(in_base), idx_base(indices), row_size(row_sz) {}
271  __host__ __device__ void operator()(const I index) {
272  memcpy(dst_base + index * row_size, src_base + idx_base[index] * row_size, row_size);
273  }
274 
275  int8_t* dst_base;
276  const int8_t* src_base;
277  const I* idx_base;
278  const size_t row_size;
279 };
280 
281 template <typename DerivedPolicy>
283  const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
284  int8_t* row_buffer,
285  const size_t key_width,
286  const size_t row_size,
287  const size_t first,
288  const size_t last) {
289  switch (key_width) {
290  case 4:
291  thrust::for_each(
292  exec,
293  thrust::make_counting_iterator(first),
294  thrust::make_counting_iterator(last),
295  KeyReseter<int32_t>(row_buffer, row_size, static_cast<int32_t>(EMPTY_KEY_32)));
296  break;
297  case 8:
298  thrust::for_each(
299  exec,
300  thrust::make_counting_iterator(first),
301  thrust::make_counting_iterator(last),
302  KeyReseter<int64_t>(row_buffer, row_size, static_cast<int64_t>(EMPTY_KEY_64)));
303  break;
304  default:
305  CHECK(false);
306  }
307 }
308 
310  Data_Namespace::DataMgr* data_mgr,
311  const int64_t* dev_heaps,
312  const size_t heaps_size,
313  const size_t n,
314  const PodOrderEntry& oe,
315  const GroupByBufferLayoutInfo& layout,
316  const size_t group_key_bytes,
317  const size_t thread_count,
318  const int device_id) {
319  const auto row_size = layout.row_bytes;
320  CHECK_EQ(heaps_size, streaming_top_n::get_heap_size(row_size, n, thread_count));
321  const int8_t* rows_ptr = reinterpret_cast<const int8_t*>(dev_heaps) +
323  const auto total_entry_count = n * thread_count;
324  ThrustAllocator thrust_allocator(data_mgr, device_id);
325  auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
326  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
327  thrust::sequence(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
328  d_indices,
329  d_indices + total_entry_count);
330  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
331  auto separator =
332  (group_key_bytes == 4)
333  ? thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
334  d_indices,
335  d_indices + total_entry_count,
336  is_taken_entry<int32_t>(rows_ptr, row_size))
337  : thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
338  d_indices,
339  d_indices + total_entry_count,
340  is_taken_entry<int64_t>(rows_ptr, row_size));
341  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
342  const size_t actual_entry_count = separator - d_indices;
343  if (!actual_entry_count) {
344  std::vector<int8_t> top_rows(row_size * n);
346  thrust::host, &top_rows[0], layout.col_bytes, row_size, 0, n);
347  return top_rows;
348  }
349 
350  const auto& oe_type = layout.oe_target_info.sql_type;
351  if (oe_type.get_notnull()) {
353  d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator, device_id);
354  } else {
355  auto separator = partition_by_null(d_indices,
356  d_indices + actual_entry_count,
357  null_val_bit_pattern(oe_type, false),
358  oe.nulls_first,
359  rows_ptr,
360  layout);
361  if (oe.nulls_first) {
362  const size_t null_count = separator - d_indices;
363  if (null_count < actual_entry_count) {
365  actual_entry_count - null_count,
366  rows_ptr,
367  oe,
368  layout,
369  thrust_allocator,
370  device_id);
371  }
372  } else {
373  const size_t nonnull_count = separator - d_indices;
374  if (nonnull_count > 0) {
376  d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator, device_id);
377  }
378  }
379  }
380 
381  const auto final_entry_count = std::min(n, actual_entry_count);
382  auto d_top_rows = get_device_ptr<int8_t>(row_size * n, thrust_allocator);
383  thrust::for_each(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
384  thrust::make_counting_iterator(size_t(0)),
385  thrust::make_counting_iterator(final_entry_count),
386  RowFetcher<int32_t>(thrust::raw_pointer_cast(d_top_rows),
387  rows_ptr,
388  thrust::raw_pointer_cast(d_indices),
389  row_size));
390  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
391 
392  if (final_entry_count < n) {
393  reset_keys_in_row_buffer(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
394  thrust::raw_pointer_cast(d_top_rows),
395  layout.col_bytes,
396  row_size,
397  final_entry_count,
398  n);
399  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
400  }
401 
402  std::vector<int8_t> top_rows(row_size * n);
403  thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
404  return top_rows;
405 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
Definition: TopKSort.cu:282
#define EMPTY_KEY_64
const I * idx_base
Definition: TopKSort.cu:124
__host__ __device__ void operator()(const size_t index)
Definition: TopKSort.cu:131
int8_t * dst_base
Definition: TopKSort.cu:275
const K empty_key
Definition: TopKSort.cu:138
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:117
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Definition: TopKSort.cu:57
const size_t row_size
Definition: TopKSort.cu:278
Utility functions for easy access to the result set buffers.
void * CUstream
Definition: nocuda.h:23
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
Streaming Top N algorithm.
const int8_t * oe_base
Definition: TopKSort.cu:122
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
Definition: TopKSort.cu:44
I argument_type
Definition: TopKSort.cu:56
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
Definition: TopKSort.cu:309
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool nulls_first
const int8_t * buff_ptr
Definition: TopKSort.cu:50
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
const int8_t * src_base
Definition: TopKSort.cu:276
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Definition: TopKSort.cu:112
Utility functions for group by buffer entries.
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:143
const size_t oe_stride
Definition: TopKSort.cu:123
is_taken_entry(const int8_t *buff, const size_t stride)
Definition: TopKSort.cu:45
Definition: TopKSort.cu:55
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:271
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
Definition: TopKSort.cu:129
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:186
const int8_t * oe_base
Definition: TopKSort.cu:71
const size_t key_stride
Definition: TopKSort.cu:51
const I * idx_base
Definition: TopKSort.cu:277
int8_t * rows_base
Definition: TopKSort.cu:136
const int64_t null_val
Definition: TopKSort.cu:73
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:47
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool is_desc
K * key_base
Definition: TopKSort.cu:121
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
Definition: TopKSort.cu:266
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define CHECK(condition)
Definition: Logger.h:222
#define checkCudaErrors(err)
Definition: GpuInitGroups.cu:9
#define EMPTY_KEY_32
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:59
constexpr double n
Definition: Utm.h:38
const size_t key_stride
Definition: TopKSort.cu:137
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
Definition: TopKSort.cu:77
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:163
const size_t oe_stride
Definition: TopKSort.cu:72