OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TopKSort.cu
Go to the documentation of this file.
1 /* copyright 2017 MapD Technologies, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /*
17  * @file TopKSort.cu
18  * @author Minggang Yu <miyu@mapd.com>
19  * @brief Top-k sorting on streaming top-k heaps on VRAM
20  *
21  * Copyright (c) 2017 MapD Technologies, Inc. All rights reserved.
22  */
23 #include "BufferEntryUtils.h"
24 #include "GpuMemUtils.h"
26 #include "SortUtils.cuh"
27 #include "StreamingTopN.h"
28 #include "TopKSort.h"
29 
30 #include <thrust/copy.h>
31 #include <thrust/execution_policy.h>
32 #include <thrust/functional.h>
33 #include <thrust/partition.h>
34 #include <thrust/sort.h>
35 
36 #include <iostream>
37 
38 template <class K, class I = int32_t>
40  is_taken_entry(const int8_t* buff, const size_t stride)
41  : buff_ptr(buff), key_stride(stride) {}
42  __host__ __device__ bool operator()(const I index) {
43  return !is_empty_entry<K>(static_cast<size_t>(index), buff_ptr, key_stride);
44  }
45  const int8_t* buff_ptr;
46  const size_t key_stride;
47 };
48 
49 template <class K, class I = int32_t>
51  typedef I argument_type;
52  is_null_order_entry(const int8_t* base, const size_t stride, const int64_t nul)
53  : oe_base(base), oe_stride(stride), null_val(nul) {}
54  __host__ __device__ bool operator()(const I index) {
55  const auto oe_val = *reinterpret_cast<const K*>(oe_base + index * oe_stride);
56  switch (sizeof(K)) {
57  case 4:
58  return *reinterpret_cast<const int32_t*>(&oe_val) ==
59  static_cast<int32_t>(null_val);
60  case 8:
61  return *reinterpret_cast<const int64_t*>(&oe_val) == null_val;
62  default:
63  return false;
64  }
65  }
66  const int8_t* oe_base;
67  const size_t oe_stride;
68  const int64_t null_val;
69 };
70 
71 template <typename ForwardIterator>
72 ForwardIterator partition_by_null(ForwardIterator first,
73  ForwardIterator last,
74  const int64_t null_val,
75  const bool nulls_first,
76  const int8_t* rows_ptr,
77  const GroupByBufferLayoutInfo& layout) {
78  if (nulls_first) {
79  return (layout.col_bytes == 4)
80  ? thrust::partition(
81  first,
82  last,
84  rows_ptr + layout.col_off, layout.row_bytes, null_val))
85  : thrust::partition(
86  first,
87  last,
89  rows_ptr + layout.col_off, layout.row_bytes, null_val));
90  } else {
91  return (layout.col_bytes == 4)
92  ? thrust::partition(
93  first,
94  last,
95  thrust::not1(is_null_order_entry<int32_t>(
96  rows_ptr + layout.col_off, layout.row_bytes, null_val)))
97  : thrust::partition(
98  first,
99  last,
100  thrust::not1(is_null_order_entry<int64_t>(
101  rows_ptr + layout.col_off, layout.row_bytes, null_val)));
102  }
103 }
104 
105 template <class K, class I>
106 struct KeyFetcher {
107  KeyFetcher(K* out_base,
108  const int8_t* src_oe_base,
109  const size_t stride,
110  const I* indices)
111  : key_base(out_base), oe_base(src_oe_base), oe_stride(stride), idx_base(indices) {}
112  __host__ __device__ void operator()(const I index) {
113  key_base[index] = *reinterpret_cast<const K*>(oe_base + idx_base[index] * oe_stride);
114  }
115 
117  const int8_t* oe_base;
118  const size_t oe_stride;
119  const I* idx_base;
120 };
121 
122 template <class K>
123 struct KeyReseter {
124  KeyReseter(int8_t* out_base, const size_t stride, const K emp_key)
125  : rows_base(out_base), key_stride(stride), empty_key(emp_key) {}
126  __host__ __device__ void operator()(const size_t index) {
127  K* key_ptr = reinterpret_cast<K*>(rows_base + index * key_stride);
128  *key_ptr = empty_key;
129  }
130 
131  int8_t* rows_base;
132  const size_t key_stride;
133  const K empty_key;
134 };
135 
136 // TODO(miyu) : switch to shared version in ResultSetSortImpl.cu.
137 template <class K, class I>
138 void collect_order_entry_column(thrust::device_ptr<K>& d_oe_col_buffer,
139  const int8_t* d_src_buffer,
140  const thrust::device_ptr<I>& d_idx_first,
141  const size_t idx_count,
142  const size_t oe_offset,
143  const size_t oe_stride) {
144  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
145  thrust::make_counting_iterator(idx_count),
146  KeyFetcher<K, I>(thrust::raw_pointer_cast(d_oe_col_buffer),
147  d_src_buffer + oe_offset,
148  oe_stride,
149  thrust::raw_pointer_cast(d_idx_first)));
150 }
151 
152 template <class K, class I>
153 void sort_indices_by_key(thrust::device_ptr<I> d_idx_first,
154  const size_t idx_count,
155  const thrust::device_ptr<K>& d_key_buffer,
156  const bool desc,
157  ThrustAllocator& allocator) {
158  if (desc) {
159  thrust::sort_by_key(thrust::device(allocator),
160  d_key_buffer,
161  d_key_buffer + idx_count,
162  d_idx_first,
163  thrust::greater<K>());
164  } else {
165  thrust::sort_by_key(
166  thrust::device(allocator), d_key_buffer, d_key_buffer + idx_count, d_idx_first);
167  }
168 }
169 
170 template <class I = int32_t>
171 void do_radix_sort(thrust::device_ptr<I> d_idx_first,
172  const size_t idx_count,
173  const int8_t* d_src_buffer,
174  const PodOrderEntry& oe,
175  const GroupByBufferLayoutInfo& layout,
176  ThrustAllocator& allocator) {
177  const auto& oe_type = layout.oe_target_info.sql_type;
178  if (oe_type.is_fp()) {
179  switch (layout.col_bytes) {
180  case 4: {
181  auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
182  collect_order_entry_column(d_oe_buffer,
183  d_src_buffer,
184  d_idx_first,
185  idx_count,
186  layout.col_off,
187  layout.row_bytes);
188  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
189  break;
190  }
191  case 8: {
192  auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
193  collect_order_entry_column(d_oe_buffer,
194  d_src_buffer,
195  d_idx_first,
196  idx_count,
197  layout.col_off,
198  layout.row_bytes);
199  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
200  break;
201  }
202  default:
203  CHECK(false);
204  }
205  return;
206  }
207  CHECK(oe_type.is_number() || oe_type.is_time());
208  switch (layout.col_bytes) {
209  case 4: {
210  auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
211  collect_order_entry_column(d_oe_buffer,
212  d_src_buffer,
213  d_idx_first,
214  idx_count,
215  layout.col_off,
216  layout.row_bytes);
217  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
218  break;
219  }
220  case 8: {
221  auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
222  collect_order_entry_column(d_oe_buffer,
223  d_src_buffer,
224  d_idx_first,
225  idx_count,
226  layout.col_off,
227  layout.row_bytes);
228  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
229  break;
230  }
231  default:
232  CHECK(false);
233  }
234 }
235 
236 template <class I>
237 struct RowFetcher {
238  RowFetcher(int8_t* out_base,
239  const int8_t* in_base,
240  const I* indices,
241  const size_t row_sz)
242  : dst_base(out_base), src_base(in_base), idx_base(indices), row_size(row_sz) {}
243  __host__ __device__ void operator()(const I index) {
244  memcpy(dst_base + index * row_size, src_base + idx_base[index] * row_size, row_size);
245  }
246 
247  int8_t* dst_base;
248  const int8_t* src_base;
249  const I* idx_base;
250  const size_t row_size;
251 };
252 
253 template <typename DerivedPolicy>
255  const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
256  int8_t* row_buffer,
257  const size_t key_width,
258  const size_t row_size,
259  const size_t first,
260  const size_t last) {
261  switch (key_width) {
262  case 4:
263  thrust::for_each(
264  exec,
265  thrust::make_counting_iterator(first),
266  thrust::make_counting_iterator(last),
267  KeyReseter<int32_t>(row_buffer, row_size, static_cast<int32_t>(EMPTY_KEY_32)));
268  break;
269  case 8:
270  thrust::for_each(
271  exec,
272  thrust::make_counting_iterator(first),
273  thrust::make_counting_iterator(last),
274  KeyReseter<int64_t>(row_buffer, row_size, static_cast<int64_t>(EMPTY_KEY_64)));
275  break;
276  default:
277  CHECK(false);
278  }
279 }
280 
282  Data_Namespace::DataMgr* data_mgr,
283  const int64_t* dev_heaps,
284  const size_t heaps_size,
285  const size_t n,
286  const PodOrderEntry& oe,
287  const GroupByBufferLayoutInfo& layout,
288  const size_t group_key_bytes,
289  const size_t thread_count,
290  const int device_id) {
291  const auto row_size = layout.row_bytes;
292  CHECK_EQ(heaps_size, streaming_top_n::get_heap_size(row_size, n, thread_count));
293  const int8_t* rows_ptr = reinterpret_cast<const int8_t*>(dev_heaps) +
295  const auto total_entry_count = n * thread_count;
296  ThrustAllocator thrust_allocator(data_mgr, device_id);
297  auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
298  thrust::sequence(d_indices, d_indices + total_entry_count);
299  auto separator = (group_key_bytes == 4)
300  ? thrust::partition(d_indices,
301  d_indices + total_entry_count,
302  is_taken_entry<int32_t>(rows_ptr, row_size))
303  : thrust::partition(d_indices,
304  d_indices + total_entry_count,
305  is_taken_entry<int64_t>(rows_ptr, row_size));
306  const size_t actual_entry_count = separator - d_indices;
307  if (!actual_entry_count) {
308  std::vector<int8_t> top_rows(row_size * n);
310  thrust::host, &top_rows[0], layout.col_bytes, row_size, 0, n);
311  return top_rows;
312  }
313 
314  const auto& oe_type = layout.oe_target_info.sql_type;
315  if (oe_type.get_notnull()) {
316  do_radix_sort(d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator);
317  } else {
318  auto separator = partition_by_null(d_indices,
319  d_indices + actual_entry_count,
320  null_val_bit_pattern(oe_type, false),
321  oe.nulls_first,
322  rows_ptr,
323  layout);
324  if (oe.nulls_first) {
325  const size_t null_count = separator - d_indices;
326  if (null_count < actual_entry_count) {
327  do_radix_sort(separator,
328  actual_entry_count - null_count,
329  rows_ptr,
330  oe,
331  layout,
332  thrust_allocator);
333  }
334  } else {
335  const size_t nonnull_count = separator - d_indices;
336  if (nonnull_count > 0) {
337  do_radix_sort(d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator);
338  }
339  }
340  }
341 
342  const auto final_entry_count = std::min(n, actual_entry_count);
343  auto d_top_rows = get_device_ptr<int8_t>(row_size * n, thrust_allocator);
344  thrust::for_each(thrust::make_counting_iterator(size_t(0)),
345  thrust::make_counting_iterator(final_entry_count),
346  RowFetcher<int32_t>(thrust::raw_pointer_cast(d_top_rows),
347  rows_ptr,
348  thrust::raw_pointer_cast(d_indices),
349  row_size));
350 
351  if (final_entry_count < n) {
352  reset_keys_in_row_buffer(thrust::device,
353  thrust::raw_pointer_cast(d_top_rows),
354  layout.col_bytes,
355  row_size,
356  final_entry_count,
357  n);
358  }
359 
360  std::vector<int8_t> top_rows(row_size * n);
361  thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
362  return top_rows;
363 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
Definition: TopKSort.cu:254
#define EMPTY_KEY_64
const I * idx_base
Definition: TopKSort.cu:119
__host__ __device__ void operator()(const size_t index)
Definition: TopKSort.cu:126
int8_t * dst_base
Definition: TopKSort.cu:247
const K empty_key
Definition: TopKSort.cu:133
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:112
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Definition: TopKSort.cu:52
const size_t row_size
Definition: TopKSort.cu:250
Utility functions for easy access to the result set buffers.
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
Streaming Top N algorithm.
const int8_t * oe_base
Definition: TopKSort.cu:117
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
Definition: TopKSort.cu:39
I argument_type
Definition: TopKSort.cu:51
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
Definition: TopKSort.cu:281
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool nulls_first
CHECK(cgen_state)
const int8_t * buff_ptr
Definition: TopKSort.cu:45
const int8_t * src_base
Definition: TopKSort.cu:248
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Definition: TopKSort.cu:107
const size_t oe_stride
Definition: TopKSort.cu:118
is_taken_entry(const int8_t *buff, const size_t stride)
Definition: TopKSort.cu:40
Definition: TopKSort.cu:50
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:243
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
Definition: TopKSort.cu:124
const int8_t * oe_base
Definition: TopKSort.cu:66
const size_t key_stride
Definition: TopKSort.cu:46
const I * idx_base
Definition: TopKSort.cu:249
int8_t * rows_base
Definition: TopKSort.cu:131
const int64_t null_val
Definition: TopKSort.cu:68
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:42
bool is_desc
K * key_base
Definition: TopKSort.cu:116
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
Definition: TopKSort.cu:238
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define EMPTY_KEY_32
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:54
const size_t key_stride
Definition: TopKSort.cu:132
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
Definition: TopKSort.cu:72
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride)
Definition: TopKSort.cu:138
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator)
Definition: TopKSort.cu:171
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator)
Definition: TopKSort.cu:153
const size_t oe_stride
Definition: TopKSort.cu:67