OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TopKSort.cu
Go to the documentation of this file.
1 /* copyright 2017 MapD Technologies, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /*
17  * @file TopKSort.cu
18  * @author Minggang Yu <miyu@mapd.com>
19  * @brief Top-k sorting on streaming top-k heaps on VRAM
20  *
21  * Copyright (c) 2017 MapD Technologies, Inc. All rights reserved.
22  */
23 #include "BufferEntryUtils.h"
24 #include "GpuMemUtils.h"
26 #include "SortUtils.cuh"
27 #include "StreamingTopN.h"
28 #include "TopKSort.h"
29 
30 #include <thrust/copy.h>
31 #include <thrust/execution_policy.h>
32 #include <thrust/functional.h>
33 #include <thrust/partition.h>
34 #include <thrust/sort.h>
35 
36 #include <iostream>
37 
38 template <class K, class I = int32_t>
40  is_taken_entry(const int8_t* buff, const size_t stride)
41  : buff_ptr(buff), key_stride(stride) {}
42  __host__ __device__ bool operator()(const I index) {
43  return !is_empty_entry<K>(static_cast<size_t>(index), buff_ptr, key_stride);
44  }
45  const int8_t* buff_ptr;
46  const size_t key_stride;
47 };
48 
49 template <class K, class I = int32_t>
51  typedef I argument_type;
52  is_null_order_entry(const int8_t* base, const size_t stride, const int64_t nul)
53  : oe_base(base), oe_stride(stride), null_val(nul) {}
54  __host__ __device__ bool operator()(const I index) {
55  const auto oe_val = *reinterpret_cast<const K*>(oe_base + index * oe_stride);
56  switch (sizeof(K)) {
57  case 4:
58  return *reinterpret_cast<const int32_t*>(&oe_val) ==
59  static_cast<int32_t>(null_val);
60  case 8:
61  return *reinterpret_cast<const int64_t*>(&oe_val) == null_val;
62  default:
63  return false;
64  }
65  }
66  const int8_t* oe_base;
67  const size_t oe_stride;
68  const int64_t null_val;
69 };
70 
71 template <typename ForwardIterator>
72 ForwardIterator partition_by_null(ForwardIterator first,
73  ForwardIterator last,
74  const int64_t null_val,
75  const bool nulls_first,
76  const int8_t* rows_ptr,
77  const GroupByBufferLayoutInfo& layout) {
78  if (nulls_first) {
79  return (layout.col_bytes == 4)
80  ? thrust::partition(
81  first,
82  last,
84  rows_ptr + layout.col_off, layout.row_bytes, null_val))
85  : thrust::partition(
86  first,
87  last,
89  rows_ptr + layout.col_off, layout.row_bytes, null_val));
90  } else {
91  return (layout.col_bytes == 4)
92  ? thrust::partition(
93  first,
94  last,
95  thrust::not1(is_null_order_entry<int32_t>(
96  rows_ptr + layout.col_off, layout.row_bytes, null_val)))
97  : thrust::partition(
98  first,
99  last,
100  thrust::not1(is_null_order_entry<int64_t>(
101  rows_ptr + layout.col_off, layout.row_bytes, null_val)));
102  }
103 }
104 
105 template <class K, class I>
106 struct KeyFetcher {
107  KeyFetcher(K* out_base,
108  const int8_t* src_oe_base,
109  const size_t stride,
110  const I* indices)
111  : key_base(out_base), oe_base(src_oe_base), oe_stride(stride), idx_base(indices) {}
112  __host__ __device__ void operator()(const I index) {
113  key_base[index] = *reinterpret_cast<const K*>(oe_base + idx_base[index] * oe_stride);
114  }
115 
117  const int8_t* oe_base;
118  const size_t oe_stride;
119  const I* idx_base;
120 };
121 
122 template <class K>
123 struct KeyReseter {
124  KeyReseter(int8_t* out_base, const size_t stride, const K emp_key)
125  : rows_base(out_base), key_stride(stride), empty_key(emp_key) {}
126  __host__ __device__ void operator()(const size_t index) {
127  K* key_ptr = reinterpret_cast<K*>(rows_base + index * key_stride);
128  *key_ptr = empty_key;
129  }
130 
131  int8_t* rows_base;
132  const size_t key_stride;
133  const K empty_key;
134 };
135 
136 // TODO(miyu) : switch to shared version in ResultSetSortImpl.cu.
137 template <class K, class I>
138 void collect_order_entry_column(thrust::device_ptr<K>& d_oe_col_buffer,
139  const int8_t* d_src_buffer,
140  const thrust::device_ptr<I>& d_idx_first,
141  const size_t idx_count,
142  const size_t oe_offset,
143  const size_t oe_stride,
144  ThrustAllocator& allocator) {
145  thrust::for_each(thrust::device(allocator),
146  thrust::make_counting_iterator(size_t(0)),
147  thrust::make_counting_iterator(idx_count),
148  KeyFetcher<K, I>(thrust::raw_pointer_cast(d_oe_col_buffer),
149  d_src_buffer + oe_offset,
150  oe_stride,
151  thrust::raw_pointer_cast(d_idx_first)));
152 }
153 
154 template <class K, class I>
155 void sort_indices_by_key(thrust::device_ptr<I> d_idx_first,
156  const size_t idx_count,
157  const thrust::device_ptr<K>& d_key_buffer,
158  const bool desc,
159  ThrustAllocator& allocator) {
160  if (desc) {
161  thrust::sort_by_key(thrust::device(allocator),
162  d_key_buffer,
163  d_key_buffer + idx_count,
164  d_idx_first,
165  thrust::greater<K>());
166  } else {
167  thrust::sort_by_key(
168  thrust::device(allocator), d_key_buffer, d_key_buffer + idx_count, d_idx_first);
169  }
170 }
171 
172 template <class I = int32_t>
173 void do_radix_sort(thrust::device_ptr<I> d_idx_first,
174  const size_t idx_count,
175  const int8_t* d_src_buffer,
176  const PodOrderEntry& oe,
177  const GroupByBufferLayoutInfo& layout,
178  ThrustAllocator& allocator) {
179  const auto& oe_type = layout.oe_target_info.sql_type;
180  if (oe_type.is_fp()) {
181  switch (layout.col_bytes) {
182  case 4: {
183  auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
184  collect_order_entry_column(d_oe_buffer,
185  d_src_buffer,
186  d_idx_first,
187  idx_count,
188  layout.col_off,
189  layout.row_bytes,
190  allocator);
191  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
192  break;
193  }
194  case 8: {
195  auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
196  collect_order_entry_column(d_oe_buffer,
197  d_src_buffer,
198  d_idx_first,
199  idx_count,
200  layout.col_off,
201  layout.row_bytes,
202  allocator);
203  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
204  break;
205  }
206  default:
207  CHECK(false);
208  }
209  return;
210  }
211  CHECK(oe_type.is_number() || oe_type.is_time());
212  switch (layout.col_bytes) {
213  case 4: {
214  auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
215  collect_order_entry_column(d_oe_buffer,
216  d_src_buffer,
217  d_idx_first,
218  idx_count,
219  layout.col_off,
220  layout.row_bytes,
221  allocator);
222  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
223  break;
224  }
225  case 8: {
226  auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
227  collect_order_entry_column(d_oe_buffer,
228  d_src_buffer,
229  d_idx_first,
230  idx_count,
231  layout.col_off,
232  layout.row_bytes,
233  allocator);
234  sort_indices_by_key(d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator);
235  break;
236  }
237  default:
238  CHECK(false);
239  }
240 }
241 
242 template <class I>
243 struct RowFetcher {
244  RowFetcher(int8_t* out_base,
245  const int8_t* in_base,
246  const I* indices,
247  const size_t row_sz)
248  : dst_base(out_base), src_base(in_base), idx_base(indices), row_size(row_sz) {}
249  __host__ __device__ void operator()(const I index) {
250  memcpy(dst_base + index * row_size, src_base + idx_base[index] * row_size, row_size);
251  }
252 
253  int8_t* dst_base;
254  const int8_t* src_base;
255  const I* idx_base;
256  const size_t row_size;
257 };
258 
259 template <typename DerivedPolicy>
261  const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
262  int8_t* row_buffer,
263  const size_t key_width,
264  const size_t row_size,
265  const size_t first,
266  const size_t last) {
267  switch (key_width) {
268  case 4:
269  thrust::for_each(
270  exec,
271  thrust::make_counting_iterator(first),
272  thrust::make_counting_iterator(last),
273  KeyReseter<int32_t>(row_buffer, row_size, static_cast<int32_t>(EMPTY_KEY_32)));
274  break;
275  case 8:
276  thrust::for_each(
277  exec,
278  thrust::make_counting_iterator(first),
279  thrust::make_counting_iterator(last),
280  KeyReseter<int64_t>(row_buffer, row_size, static_cast<int64_t>(EMPTY_KEY_64)));
281  break;
282  default:
283  CHECK(false);
284  }
285 }
286 
288  Data_Namespace::DataMgr* data_mgr,
289  const int64_t* dev_heaps,
290  const size_t heaps_size,
291  const size_t n,
292  const PodOrderEntry& oe,
293  const GroupByBufferLayoutInfo& layout,
294  const size_t group_key_bytes,
295  const size_t thread_count,
296  const int device_id) {
297  const auto row_size = layout.row_bytes;
298  CHECK_EQ(heaps_size, streaming_top_n::get_heap_size(row_size, n, thread_count));
299  const int8_t* rows_ptr = reinterpret_cast<const int8_t*>(dev_heaps) +
301  const auto total_entry_count = n * thread_count;
302  ThrustAllocator thrust_allocator(data_mgr, device_id);
303  auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
304  thrust::sequence(
305  thrust::device(thrust_allocator), d_indices, d_indices + total_entry_count);
306  auto separator = (group_key_bytes == 4)
307  ? thrust::partition(thrust::device(thrust_allocator),
308  d_indices,
309  d_indices + total_entry_count,
310  is_taken_entry<int32_t>(rows_ptr, row_size))
311  : thrust::partition(thrust::device(thrust_allocator),
312  d_indices,
313  d_indices + total_entry_count,
314  is_taken_entry<int64_t>(rows_ptr, row_size));
315  const size_t actual_entry_count = separator - d_indices;
316  if (!actual_entry_count) {
317  std::vector<int8_t> top_rows(row_size * n);
319  thrust::host, &top_rows[0], layout.col_bytes, row_size, 0, n);
320  return top_rows;
321  }
322 
323  const auto& oe_type = layout.oe_target_info.sql_type;
324  if (oe_type.get_notnull()) {
325  do_radix_sort(d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator);
326  } else {
327  auto separator = partition_by_null(d_indices,
328  d_indices + actual_entry_count,
329  null_val_bit_pattern(oe_type, false),
330  oe.nulls_first,
331  rows_ptr,
332  layout);
333  if (oe.nulls_first) {
334  const size_t null_count = separator - d_indices;
335  if (null_count < actual_entry_count) {
336  do_radix_sort(separator,
337  actual_entry_count - null_count,
338  rows_ptr,
339  oe,
340  layout,
341  thrust_allocator);
342  }
343  } else {
344  const size_t nonnull_count = separator - d_indices;
345  if (nonnull_count > 0) {
346  do_radix_sort(d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator);
347  }
348  }
349  }
350 
351  const auto final_entry_count = std::min(n, actual_entry_count);
352  auto d_top_rows = get_device_ptr<int8_t>(row_size * n, thrust_allocator);
353  thrust::for_each(thrust::device(thrust_allocator),
354  thrust::make_counting_iterator(size_t(0)),
355  thrust::make_counting_iterator(final_entry_count),
356  RowFetcher<int32_t>(thrust::raw_pointer_cast(d_top_rows),
357  rows_ptr,
358  thrust::raw_pointer_cast(d_indices),
359  row_size));
360 
361  if (final_entry_count < n) {
362  reset_keys_in_row_buffer(thrust::device(thrust_allocator),
363  thrust::raw_pointer_cast(d_top_rows),
364  layout.col_bytes,
365  row_size,
366  final_entry_count,
367  n);
368  }
369 
370  std::vector<int8_t> top_rows(row_size * n);
371  thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
372  return top_rows;
373 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
Definition: TopKSort.cu:260
#define EMPTY_KEY_64
const I * idx_base
Definition: TopKSort.cu:119
__host__ __device__ void operator()(const size_t index)
Definition: TopKSort.cu:126
int8_t * dst_base
Definition: TopKSort.cu:253
const K empty_key
Definition: TopKSort.cu:133
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:112
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Definition: TopKSort.cu:52
const size_t row_size
Definition: TopKSort.cu:256
Utility functions for easy access to the result set buffers.
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
Streaming Top N algorithm.
const int8_t * oe_base
Definition: TopKSort.cu:117
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
Definition: TopKSort.cu:39
I argument_type
Definition: TopKSort.cu:51
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
Definition: TopKSort.cu:287
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool nulls_first
const int8_t * buff_ptr
Definition: TopKSort.cu:45
const int8_t * src_base
Definition: TopKSort.cu:254
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Definition: TopKSort.cu:107
const size_t oe_stride
Definition: TopKSort.cu:118
is_taken_entry(const int8_t *buff, const size_t stride)
Definition: TopKSort.cu:40
Definition: TopKSort.cu:50
__host__ __device__ void operator()(const I index)
Definition: TopKSort.cu:249
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
Definition: TopKSort.cu:124
const int8_t * oe_base
Definition: TopKSort.cu:66
const size_t key_stride
Definition: TopKSort.cu:46
const I * idx_base
Definition: TopKSort.cu:255
int8_t * rows_base
Definition: TopKSort.cu:131
const int64_t null_val
Definition: TopKSort.cu:68
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:42
bool is_desc
K * key_base
Definition: TopKSort.cu:116
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
Definition: TopKSort.cu:244
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define CHECK(condition)
Definition: Logger.h:197
#define EMPTY_KEY_32
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator)
Definition: TopKSort.cu:138
__host__ __device__ bool operator()(const I index)
Definition: TopKSort.cu:54
const size_t key_stride
Definition: TopKSort.cu:132
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
Definition: TopKSort.cu:72
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator)
Definition: TopKSort.cu:173
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator)
Definition: TopKSort.cu:155
const size_t oe_stride
Definition: TopKSort.cu:67