26 #include "SortUtils.cuh"
30 #include <thrust/copy.h>
31 #include <thrust/execution_policy.h>
32 #include <thrust/functional.h>
33 #include <thrust/partition.h>
34 #include <thrust/sort.h>
39 #define checkCudaErrors(err) CHECK_EQ(err, CUDA_SUCCESS)
43 template <
class K,
class I =
int32_t>
54 template <
class K,
class I =
int32_t>
60 const auto oe_val = *
reinterpret_cast<const K*
>(
oe_base + index *
oe_stride);
63 return *
reinterpret_cast<const int32_t*
>(&oe_val) ==
66 return *
reinterpret_cast<const int64_t*
>(&oe_val) ==
null_val;
76 template <
typename ForwardIterator>
79 const int64_t null_val,
80 const bool nulls_first,
81 const int8_t* rows_ptr,
110 template <
class K,
class I>
113 const int8_t* src_oe_base,
129 KeyReseter(int8_t* out_base,
const size_t stride,
const K emp_key)
142 template <
class K,
class I>
144 const int8_t* d_src_buffer,
145 const thrust::device_ptr<I>& d_idx_first,
146 const size_t idx_count,
147 const size_t oe_offset,
148 const size_t oe_stride,
150 const int device_id) {
152 thrust::for_each(thrust::cuda::par(allocator).on(qe_cuda_stream),
153 thrust::make_counting_iterator(
size_t(0)),
154 thrust::make_counting_iterator(idx_count),
156 d_src_buffer + oe_offset,
158 thrust::raw_pointer_cast(d_idx_first)));
162 template <
class K,
class I>
164 const size_t idx_count,
165 const thrust::device_ptr<K>& d_key_buffer,
168 const int device_id) {
171 thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
173 d_key_buffer + idx_count,
175 thrust::greater<K>());
177 thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
179 d_key_buffer + idx_count,
185 template <
class I =
int32_t>
187 const size_t idx_count,
188 const int8_t* d_src_buffer,
192 const int device_id) {
194 if (oe_type.is_fp()) {
197 auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
207 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
211 auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
221 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
229 CHECK(oe_type.is_number() || oe_type.is_time());
232 auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
242 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
246 auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
256 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
267 const int8_t* in_base,
281 template <
typename DerivedPolicy>
283 const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
285 const size_t key_width,
286 const size_t row_size,
293 thrust::make_counting_iterator(first),
294 thrust::make_counting_iterator(last),
300 thrust::make_counting_iterator(first),
301 thrust::make_counting_iterator(last),
311 const int64_t* dev_heaps,
312 const size_t heaps_size,
316 const size_t group_key_bytes,
317 const size_t thread_count,
318 const int device_id) {
321 const int8_t* rows_ptr =
reinterpret_cast<const int8_t*
>(dev_heaps) +
323 const auto total_entry_count = n * thread_count;
325 auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
327 thrust::sequence(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
329 d_indices + total_entry_count);
332 (group_key_bytes == 4)
333 ? thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
335 d_indices + total_entry_count,
337 : thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
339 d_indices + total_entry_count,
342 const size_t actual_entry_count =
separator - d_indices;
343 if (!actual_entry_count) {
344 std::vector<int8_t> top_rows(row_size * n);
346 thrust::host, &top_rows[0], layout.
col_bytes, row_size, 0, n);
351 if (oe_type.get_notnull()) {
353 d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator, device_id);
356 d_indices + actual_entry_count,
362 const size_t null_count =
separator - d_indices;
363 if (null_count < actual_entry_count) {
365 actual_entry_count - null_count,
373 const size_t nonnull_count =
separator - d_indices;
374 if (nonnull_count > 0) {
376 d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator, device_id);
381 const auto final_entry_count = std::min(n, actual_entry_count);
382 auto d_top_rows = get_device_ptr<int8_t>(row_size *
n, thrust_allocator);
383 thrust::for_each(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
384 thrust::make_counting_iterator(
size_t(0)),
385 thrust::make_counting_iterator(final_entry_count),
388 thrust::raw_pointer_cast(d_indices),
392 if (final_entry_count < n) {
394 thrust::raw_pointer_cast(d_top_rows),
402 std::vector<int8_t> top_rows(row_size * n);
403 thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
__host__ __device__ void operator()(const size_t index)
__host__ __device__ void operator()(const I index)
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Utility functions for easy access to the result set buffers.
Streaming Top N algorithm.
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
DEVICE auto copy(ARGS &&...args)
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Utility functions for group by buffer entries.
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
is_taken_entry(const int8_t *buff, const size_t stride)
__host__ __device__ void operator()(const I index)
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
__host__ __device__ bool operator()(const I index)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define checkCudaErrors(err)
__host__ __device__ bool operator()(const I index)
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)