OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopKSort.cu
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "BufferEntryUtils.h"
18 #include "GpuMemUtils.h"
20 #include "SortUtils.cuh"
21 #include "StreamingTopN.h"
22 #include "TopKSort.h"
23 
24 #include <thrust/copy.h>
25 #include <thrust/execution_policy.h>
26 #include <thrust/functional.h>
27 #include <thrust/partition.h>
28 #include <thrust/sort.h>
29 
30 #include <cuda.h>
32 
33 #define checkCudaErrors(err) CHECK_EQ(err, CUDA_SUCCESS)
34 
35 #include <iostream>
36 
37 template <class K, class I = int32_t>
39  is_taken_entry(const int8_t* buff, const size_t stride)
40  : buff_ptr(buff), key_stride(stride) {}
41  __host__ __device__ bool operator()(const I index) {
42  return !is_empty_entry<K>(static_cast<size_t>(index), buff_ptr, key_stride);
43  }
44  const int8_t* buff_ptr;
45  const size_t key_stride;
46 };
47 
48 template <class K, class I = int32_t>
50  using argument_type = I;
51  is_null_order_entry(const int8_t* base, const size_t stride, const int64_t nul)
52  : oe_base(base), oe_stride(stride), null_val(nul) {}
53  __host__ __device__ bool operator()(const I index) {
54  const auto oe_val = *reinterpret_cast<const K*>(oe_base + index * oe_stride);
55  switch (sizeof(K)) {
56  case 4:
57  return *reinterpret_cast<const int32_t*>(&oe_val) ==
58  static_cast<int32_t>(null_val);
59  case 8:
60  return *reinterpret_cast<const int64_t*>(&oe_val) == null_val;
61  default:
62  return false;
63  }
64  }
65  const int8_t* oe_base;
66  const size_t oe_stride;
67  const int64_t null_val;
68 };
69 
70 template <typename ForwardIterator>
71 ForwardIterator partition_by_null(ForwardIterator first,
72  ForwardIterator last,
73  const int64_t null_val,
74  const bool nulls_first,
75  const int8_t* rows_ptr,
76  const GroupByBufferLayoutInfo& layout) {
77  if (nulls_first) {
78  return (layout.col_bytes == 4)
79  ? thrust::partition(
80  first,
81  last,
83  rows_ptr + layout.col_off, layout.row_bytes, null_val))
84  : thrust::partition(
85  first,
86  last,
88  rows_ptr + layout.col_off, layout.row_bytes, null_val));
89  } else {
90  return (layout.col_bytes == 4)
91  ? thrust::partition(
92  first,
93  last,
94  thrust::not1(is_null_order_entry<int32_t>(
95  rows_ptr + layout.col_off, layout.row_bytes, null_val)))
96  : thrust::partition(
97  first,
98  last,
99  thrust::not1(is_null_order_entry<int64_t>(
100  rows_ptr + layout.col_off, layout.row_bytes, null_val)));
101  }
102 }
103 
104 template <class K, class I>
105 struct KeyFetcher {
106  KeyFetcher(K* out_base,
107  const int8_t* src_oe_base,
108  const size_t stride,
109  const I* indices)
110  : key_base(out_base), oe_base(src_oe_base), oe_stride(stride), idx_base(indices) {}
111  __host__ __device__ void operator()(const I index) {
112  key_base[index] = *reinterpret_cast<const K*>(oe_base + idx_base[index] * oe_stride);
113  }
114 
116  const int8_t* oe_base;
117  const size_t oe_stride;
118  const I* idx_base;
119 };
120 
121 template <class K>
122 struct KeyReseter {
123  KeyReseter(int8_t* out_base, const size_t stride, const K emp_key)
124  : rows_base(out_base), key_stride(stride), empty_key(emp_key) {}
125  __host__ __device__ void operator()(const size_t index) {
126  K* key_ptr = reinterpret_cast<K*>(rows_base + index * key_stride);
127  *key_ptr = empty_key;
128  }
129 
130  int8_t* rows_base;
131  const size_t key_stride;
132  const K empty_key;
133 };
134 
135 // TODO(miyu) : switch to shared version in ResultSetSortImpl.cu.
136 template <class K, class I>
137 void collect_order_entry_column(thrust::device_ptr<K>& d_oe_col_buffer,
138  const int8_t* d_src_buffer,
139  const thrust::device_ptr<I>& d_idx_first,
140  const size_t idx_count,
141  const size_t oe_offset,
142  const size_t oe_stride,
143  ThrustAllocator& allocator,
144  const int device_id) {
145  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
146  thrust::for_each(thrust::cuda::par(allocator).on(qe_cuda_stream),
147  thrust::make_counting_iterator(size_t(0)),
148  thrust::make_counting_iterator(idx_count),
149  KeyFetcher<K, I>(thrust::raw_pointer_cast(d_oe_col_buffer),
150  d_src_buffer + oe_offset,
151  oe_stride,
152  thrust::raw_pointer_cast(d_idx_first)));
153  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
154 }
155 
156 template <class K, class I>
157 void sort_indices_by_key(thrust::device_ptr<I> d_idx_first,
158  const size_t idx_count,
159  const thrust::device_ptr<K>& d_key_buffer,
160  const bool desc,
161  ThrustAllocator& allocator,
162  const int device_id) {
163  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
164  if (desc) {
165  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
166  d_key_buffer,
167  d_key_buffer + idx_count,
168  d_idx_first,
169  thrust::greater<K>());
170  } else {
171  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
172  d_key_buffer,
173  d_key_buffer + idx_count,
174  d_idx_first);
175  }
176  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
177 }
178 
179 template <class I = int32_t>
180 void do_radix_sort(thrust::device_ptr<I> d_idx_first,
181  const size_t idx_count,
182  const int8_t* d_src_buffer,
183  const PodOrderEntry& oe,
184  const GroupByBufferLayoutInfo& layout,
185  ThrustAllocator& allocator,
186  const int device_id) {
187  const auto& oe_type = layout.oe_target_info.sql_type;
188  if (oe_type.is_fp()) {
189  switch (layout.col_bytes) {
190  case 4: {
191  auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
192  collect_order_entry_column(d_oe_buffer,
193  d_src_buffer,
194  d_idx_first,
195  idx_count,
196  layout.col_off,
197  layout.row_bytes,
198  allocator,
199  device_id);
201  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
202  break;
203  }
204  case 8: {
205  auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
206  collect_order_entry_column(d_oe_buffer,
207  d_src_buffer,
208  d_idx_first,
209  idx_count,
210  layout.col_off,
211  layout.row_bytes,
212  allocator,
213  device_id);
215  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
216  break;
217  }
218  default:
219  CHECK(false);
220  }
221  return;
222  }
223  CHECK(oe_type.is_number() || oe_type.is_time());
224  switch (layout.col_bytes) {
225  case 4: {
226  auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
227  collect_order_entry_column(d_oe_buffer,
228  d_src_buffer,
229  d_idx_first,
230  idx_count,
231  layout.col_off,
232  layout.row_bytes,
233  allocator,
234  device_id);
236  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
237  break;
238  }
239  case 8: {
240  auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
241  collect_order_entry_column(d_oe_buffer,
242  d_src_buffer,
243  d_idx_first,
244  idx_count,
245  layout.col_off,
246  layout.row_bytes,
247  allocator,
248  device_id);
250  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
251  break;
252  }
253  default:
254  CHECK(false);
255  }
256 }
257 
258 template <class I>
259 struct RowFetcher {
260  RowFetcher(int8_t* out_base,
261  const int8_t* in_base,
262  const I* indices,
263  const size_t row_sz)
264  : dst_base(out_base), src_base(in_base), idx_base(indices), row_size(row_sz) {}
265  __host__ __device__ void operator()(const I index) {
266  memcpy(dst_base + index * row_size, src_base + idx_base[index] * row_size, row_size);
267  }
268 
269  int8_t* dst_base;
270  const int8_t* src_base;
271  const I* idx_base;
272  const size_t row_size;
273 };
274 
275 template <typename DerivedPolicy>
277  const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
278  int8_t* row_buffer,
279  const size_t key_width,
280  const size_t row_size,
281  const size_t first,
282  const size_t last) {
283  switch (key_width) {
284  case 4:
285  thrust::for_each(
286  exec,
287  thrust::make_counting_iterator(first),
288  thrust::make_counting_iterator(last),
289  KeyReseter<int32_t>(row_buffer, row_size, static_cast<int32_t>(EMPTY_KEY_32)));
290  break;
291  case 8:
292  thrust::for_each(
293  exec,
294  thrust::make_counting_iterator(first),
295  thrust::make_counting_iterator(last),
296  KeyReseter<int64_t>(row_buffer, row_size, static_cast<int64_t>(EMPTY_KEY_64)));
297  break;
298  default:
299  CHECK(false);
300  }
301 }
302 
304  Data_Namespace::DataMgr* data_mgr,
305  const int64_t* dev_heaps,
306  const size_t heaps_size,
307  const size_t n,
308  const PodOrderEntry& oe,
309  const GroupByBufferLayoutInfo& layout,
310  const size_t group_key_bytes,
311  const size_t thread_count,
312  const int device_id) {
313  const auto row_size = layout.row_bytes;
314  CHECK_EQ(heaps_size, streaming_top_n::get_heap_size(row_size, n, thread_count));
315  const int8_t* rows_ptr = reinterpret_cast<const int8_t*>(dev_heaps) +
317  const auto total_entry_count = n * thread_count;
318  ThrustAllocator thrust_allocator(data_mgr, device_id);
319  auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
320  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
321  thrust::sequence(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
322  d_indices,
323  d_indices + total_entry_count);
324  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
325  auto separator =
326  (group_key_bytes == 4)
327  ? thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
328  d_indices,
329  d_indices + total_entry_count,
330  is_taken_entry<int32_t>(rows_ptr, row_size))
331  : thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
332  d_indices,
333  d_indices + total_entry_count,
334  is_taken_entry<int64_t>(rows_ptr, row_size));
335  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
336  const size_t actual_entry_count = separator - d_indices;
337  if (!actual_entry_count) {
338  std::vector<int8_t> top_rows(row_size * n);
340  thrust::host, &top_rows[0], layout.col_bytes, row_size, 0, n);
341  return top_rows;
342  }
343 
344  const auto& oe_type = layout.oe_target_info.sql_type;
345  if (oe_type.get_notnull()) {
347  d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator, device_id);
348  } else {
349  auto separator = partition_by_null(d_indices,
350  d_indices + actual_entry_count,
351  null_val_bit_pattern(oe_type, false),
352  oe.nulls_first,
353  rows_ptr,
354  layout);
355  if (oe.nulls_first) {
356  const size_t null_count = separator - d_indices;
357  if (null_count < actual_entry_count) {
359  actual_entry_count - null_count,
360  rows_ptr,
361  oe,
362  layout,
363  thrust_allocator,
364  device_id);
365  }
366  } else {
367  const size_t nonnull_count = separator - d_indices;
368  if (nonnull_count > 0) {
370  d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator, device_id);
371  }
372  }
373  }
374 
375  const auto final_entry_count = std::min(n, actual_entry_count);
376  auto d_top_rows = get_device_ptr<int8_t>(row_size * n, thrust_allocator);
377  thrust::for_each(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
378  thrust::make_counting_iterator(size_t(0)),
379  thrust::make_counting_iterator(final_entry_count),
380  RowFetcher<int32_t>(thrust::raw_pointer_cast(d_top_rows),
381  rows_ptr,
382  thrust::raw_pointer_cast(d_indices),
383  row_size));
384  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
385 
386  if (final_entry_count < n) {
387  reset_keys_in_row_buffer(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
388  thrust::raw_pointer_cast(d_top_rows),
389  layout.col_bytes,
390  row_size,
391  final_entry_count,
392  n);
393  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
394  }
395 
396  std::vector<int8_t> top_rows(row_size * n);
397  thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
398  return top_rows;
399 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
Definition: TopKSort.cu:276
#define EMPTY_KEY_64
const I * idx_base
Definition: TopKSort.cu:118
__host__ __device__ void operator()(const size_t index)
Definition: TopKSort.cu:125
int8_t * dst_base
Definition: TopKSort.cu:269
const K empty_key
Definition: TopKSort.cu:132
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:111
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Definition: TopKSort.cu:51
const size_t row_size
Definition: TopKSort.cu:272
Utility functions for easy access to the result set buffers.
void * CUstream
Definition: nocuda.h:23
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
Streaming Top N algorithm.
const int8_t * oe_base
Definition: TopKSort.cu:116
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
Definition: TopKSort.cu:38
I argument_type
Definition: TopKSort.cu:50
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
Definition: TopKSort.cu:303
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool nulls_first
const int8_t * buff_ptr
Definition: TopKSort.cu:44
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
const int8_t * src_base
Definition: TopKSort.cu:270
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Definition: TopKSort.cu:106
Utility functions for group by buffer entries.
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:137
const size_t oe_stride
Definition: TopKSort.cu:117
is_taken_entry(const int8_t *buff, const size_t stride)
Definition: TopKSort.cu:39
Definition: TopKSort.cu:49
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:265
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
Definition: TopKSort.cu:123
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:180
const int8_t * oe_base
Definition: TopKSort.cu:65
const size_t key_stride
Definition: TopKSort.cu:45
const I * idx_base
Definition: TopKSort.cu:271
int8_t * rows_base
Definition: TopKSort.cu:130
const int64_t null_val
Definition: TopKSort.cu:67
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:41
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool is_desc
K * key_base
Definition: TopKSort.cu:115
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
Definition: TopKSort.cu:260
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define CHECK(condition)
Definition: Logger.h:291
#define checkCudaErrors(err)
Definition: GpuInitGroups.cu:9
#define EMPTY_KEY_32
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:53
constexpr double n
Definition: Utm.h:38
const size_t key_stride
Definition: TopKSort.cu:131
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
Definition: TopKSort.cu:71
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:157
const size_t oe_stride
Definition: TopKSort.cu:66