OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopKSort.cu File Reference
#include "BufferEntryUtils.h"
#include "GpuMemUtils.h"
#include "ResultSetBufferAccessors.h"
#include "SortUtils.cuh"
#include "StreamingTopN.h"
#include "TopKSort.h"
#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/functional.h>
#include <thrust/partition.h>
#include <thrust/sort.h>
#include <cuda.h>
#include <iostream>
+ Include dependency graph for TopKSort.cu:

Go to the source code of this file.

Classes

struct  is_taken_entry< K, I >
 
struct  is_null_order_entry< K, I >
 
struct  KeyFetcher< K, I >
 
struct  KeyReseter< K >
 
struct  RowFetcher< I >
 

Macros

#define checkCudaErrors(err)   CHECK_EQ(err, CUDA_SUCCESS)
 

Functions

CUstream getQueryEngineCudaStreamForDevice (int device_num)
 
template<typename ForwardIterator >
ForwardIterator partition_by_null (ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
 
template<class K , class I >
void collect_order_entry_column (thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
 
template<class K , class I >
void sort_indices_by_key (thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)
 
template<class I = int32_t>
void do_radix_sort (thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
 
template<typename DerivedPolicy >
void reset_keys_in_row_buffer (const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
 
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu (Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
 

Macro Definition Documentation

#define checkCudaErrors (   err)    CHECK_EQ(err, CUDA_SUCCESS)

Definition at line 33 of file TopKSort.cu.

Function Documentation

template<class K , class I >
void collect_order_entry_column ( thrust::device_ptr< K > &  d_oe_col_buffer,
const int8_t *  d_src_buffer,
const thrust::device_ptr< I > &  d_idx_first,
const size_t  idx_count,
const size_t  oe_offset,
const size_t  oe_stride,
ThrustAllocator allocator,
const int  device_id 
)

Definition at line 137 of file TopKSort.cu.

References checkCudaErrors, and getQueryEngineCudaStreamForDevice().

Referenced by do_radix_sort().

144  {
145  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
146  thrust::for_each(thrust::cuda::par(allocator).on(qe_cuda_stream),
147  thrust::make_counting_iterator(size_t(0)),
148  thrust::make_counting_iterator(idx_count),
149  KeyFetcher<K, I>(thrust::raw_pointer_cast(d_oe_col_buffer),
150  d_src_buffer + oe_offset,
151  oe_stride,
152  thrust::raw_pointer_cast(d_idx_first)));
153  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
154 }
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define checkCudaErrors(err)
Definition: GpuInitGroups.cu:9

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class I = int32_t>
void do_radix_sort ( thrust::device_ptr< I >  d_idx_first,
const size_t  idx_count,
const int8_t *  d_src_buffer,
const PodOrderEntry oe,
const GroupByBufferLayoutInfo layout,
ThrustAllocator allocator,
const int  device_id 
)

Definition at line 180 of file TopKSort.cu.

References CHECK, GroupByBufferLayoutInfo::col_bytes, GroupByBufferLayoutInfo::col_off, collect_order_entry_column(), PodOrderEntry::is_desc, GroupByBufferLayoutInfo::oe_target_info, GroupByBufferLayoutInfo::row_bytes, sort_indices_by_key(), and TargetInfo::sql_type.

Referenced by pop_n_rows_from_merged_heaps_gpu().

186  {
187  const auto& oe_type = layout.oe_target_info.sql_type;
188  if (oe_type.is_fp()) {
189  switch (layout.col_bytes) {
190  case 4: {
191  auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
192  collect_order_entry_column(d_oe_buffer,
193  d_src_buffer,
194  d_idx_first,
195  idx_count,
196  layout.col_off,
197  layout.row_bytes,
198  allocator,
199  device_id);
201  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
202  break;
203  }
204  case 8: {
205  auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
206  collect_order_entry_column(d_oe_buffer,
207  d_src_buffer,
208  d_idx_first,
209  idx_count,
210  layout.col_off,
211  layout.row_bytes,
212  allocator,
213  device_id);
215  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
216  break;
217  }
218  default:
219  CHECK(false);
220  }
221  return;
222  }
223  CHECK(oe_type.is_number() || oe_type.is_time());
224  switch (layout.col_bytes) {
225  case 4: {
226  auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
227  collect_order_entry_column(d_oe_buffer,
228  d_src_buffer,
229  d_idx_first,
230  idx_count,
231  layout.col_off,
232  layout.row_bytes,
233  allocator,
234  device_id);
236  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
237  break;
238  }
239  case 8: {
240  auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
241  collect_order_entry_column(d_oe_buffer,
242  d_src_buffer,
243  d_idx_first,
244  idx_count,
245  layout.col_off,
246  layout.row_bytes,
247  allocator,
248  device_id);
250  d_idx_first, idx_count, d_oe_buffer, oe.is_desc, allocator, device_id);
251  break;
252  }
253  default:
254  CHECK(false);
255  }
256 }
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:137
bool is_desc
const TargetInfo oe_target_info
#define CHECK(condition)
Definition: Logger.h:291
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:157

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

CUstream getQueryEngineCudaStreamForDevice ( int  device_num)

Definition at line 7 of file QueryEngine.cpp.

References QueryEngine::getInstance().

Referenced by RangeJoinHashTable::approximateTupleCount(), BoundingBoxIntersectJoinHashTable::approximateTupleCount(), BaselineJoinHashTable::approximateTupleCount(), collect_order_entry_column(), anonymous_namespace{BoundingBoxIntersectJoinHashTable.cpp}::compute_bucket_sizes(), copy_projection_buffer_from_gpu_columnar(), copy_to_nvidia_gpu(), BaselineJoinHashTable::copyCpuHashTableToGpu(), PerfectJoinHashTable::copyCpuHashTableToGpu(), QueryMemoryInitializer::copyFromTableFunctionGpuBuffers(), anonymous_namespace{ResultSetSortImpl.cu}::do_radix_sort(), TableFunctionExecutionContext::execute(), anonymous_namespace{ResultSetIteration.cpp}::fetch_data_from_gpu(), ResultSet::getVarlenOrderEntry(), BaselineJoinHashTable::initHashTableForDevice(), BaselineJoinHashTableBuilder::initHashTableOnGpu(), InValuesBitmap::InValuesBitmap(), TableFunctionExecutionContext::launchGpuCode(), ResultSet::makeVarlenTargetValue(), pop_n_rows_from_merged_heaps_gpu(), QueryExecutionContext::QueryExecutionContext(), ResultSet::radixSortOnGpu(), PerfectJoinHashTable::reify(), RangeJoinHashTable::reifyWithLayout(), BoundingBoxIntersectJoinHashTable::reifyWithLayout(), BaselineJoinHashTable::reifyWithLayout(), ExecutionKernel::runImpl(), sort_indices_by_key(), ResultSet::syncEstimatorBuffer(), PerfectJoinHashTable::toSet(), BaselineJoinHashTable::toSet(), BoundingBoxIntersectJoinHashTable::toSet(), PerfectJoinHashTable::toString(), BaselineJoinHashTable::toString(), and BoundingBoxIntersectJoinHashTable::toString().

8  { // NOTE: CUstream is cudaStream_t
9  return QueryEngine::getInstance()->getCudaStreamForDevice(device_num);
10 }
static std::shared_ptr< QueryEngine > getInstance()
Definition: QueryEngine.h:89

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename ForwardIterator >
ForwardIterator partition_by_null ( ForwardIterator  first,
ForwardIterator  last,
const int64_t  null_val,
const bool  nulls_first,
const int8_t *  rows_ptr,
const GroupByBufferLayoutInfo layout 
)

Definition at line 71 of file TopKSort.cu.

References GroupByBufferLayoutInfo::col_bytes, GroupByBufferLayoutInfo::col_off, and GroupByBufferLayoutInfo::row_bytes.

Referenced by pop_n_rows_from_merged_heaps_gpu().

76  {
77  if (nulls_first) {
78  return (layout.col_bytes == 4)
79  ? thrust::partition(
80  first,
81  last,
83  rows_ptr + layout.col_off, layout.row_bytes, null_val))
84  : thrust::partition(
85  first,
86  last,
87  is_null_order_entry<int64_t>(
88  rows_ptr + layout.col_off, layout.row_bytes, null_val));
89  } else {
90  return (layout.col_bytes == 4)
91  ? thrust::partition(
92  first,
93  last,
94  thrust::not1(is_null_order_entry<int32_t>(
95  rows_ptr + layout.col_off, layout.row_bytes, null_val)))
96  : thrust::partition(
97  first,
98  last,
99  thrust::not1(is_null_order_entry<int64_t>(
100  rows_ptr + layout.col_off, layout.row_bytes, null_val)));
101  }
102 }
Definition: TopKSort.cu:49

+ Here is the caller graph for this function:

std::vector<int8_t> pop_n_rows_from_merged_heaps_gpu ( Data_Namespace::DataMgr data_mgr,
const int64_t *  dev_heaps,
const size_t  heaps_size,
const size_t  n,
const PodOrderEntry oe,
const GroupByBufferLayoutInfo layout,
const size_t  group_key_bytes,
const size_t  thread_count,
const int  device_id 
)

Definition at line 303 of file TopKSort.cu.

References CHECK_EQ, checkCudaErrors, GroupByBufferLayoutInfo::col_bytes, gpu_enabled::copy(), do_radix_sort(), streaming_top_n::get_heap_size(), streaming_top_n::get_rows_offset_of_heaps(), getQueryEngineCudaStreamForDevice(), anonymous_namespace{Utm.h}::n, null_val_bit_pattern(), PodOrderEntry::nulls_first, GroupByBufferLayoutInfo::oe_target_info, partition_by_null(), reset_keys_in_row_buffer(), GroupByBufferLayoutInfo::row_bytes, generate_TableFunctionsFactory_init::separator, and TargetInfo::sql_type.

312  {
313  const auto row_size = layout.row_bytes;
314  CHECK_EQ(heaps_size, streaming_top_n::get_heap_size(row_size, n, thread_count));
315  const int8_t* rows_ptr = reinterpret_cast<const int8_t*>(dev_heaps) +
317  const auto total_entry_count = n * thread_count;
318  ThrustAllocator thrust_allocator(data_mgr, device_id);
319  auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
320  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
321  thrust::sequence(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
322  d_indices,
323  d_indices + total_entry_count);
324  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
325  auto separator =
326  (group_key_bytes == 4)
327  ? thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
328  d_indices,
329  d_indices + total_entry_count,
330  is_taken_entry<int32_t>(rows_ptr, row_size))
331  : thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
332  d_indices,
333  d_indices + total_entry_count,
334  is_taken_entry<int64_t>(rows_ptr, row_size));
335  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
336  const size_t actual_entry_count = separator - d_indices;
337  if (!actual_entry_count) {
338  std::vector<int8_t> top_rows(row_size * n);
340  thrust::host, &top_rows[0], layout.col_bytes, row_size, 0, n);
341  return top_rows;
342  }
343 
344  const auto& oe_type = layout.oe_target_info.sql_type;
345  if (oe_type.get_notnull()) {
347  d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator, device_id);
348  } else {
349  auto separator = partition_by_null(d_indices,
350  d_indices + actual_entry_count,
351  null_val_bit_pattern(oe_type, false),
352  oe.nulls_first,
353  rows_ptr,
354  layout);
355  if (oe.nulls_first) {
356  const size_t null_count = separator - d_indices;
357  if (null_count < actual_entry_count) {
359  actual_entry_count - null_count,
360  rows_ptr,
361  oe,
362  layout,
363  thrust_allocator,
364  device_id);
365  }
366  } else {
367  const size_t nonnull_count = separator - d_indices;
368  if (nonnull_count > 0) {
370  d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator, device_id);
371  }
372  }
373  }
374 
375  const auto final_entry_count = std::min(n, actual_entry_count);
376  auto d_top_rows = get_device_ptr<int8_t>(row_size * n, thrust_allocator);
377  thrust::for_each(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
378  thrust::make_counting_iterator(size_t(0)),
379  thrust::make_counting_iterator(final_entry_count),
380  RowFetcher<int32_t>(thrust::raw_pointer_cast(d_top_rows),
381  rows_ptr,
382  thrust::raw_pointer_cast(d_indices),
383  row_size));
384  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
385 
386  if (final_entry_count < n) {
387  reset_keys_in_row_buffer(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
388  thrust::raw_pointer_cast(d_top_rows),
389  layout.col_bytes,
390  row_size,
391  final_entry_count,
392  n);
393  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
394  }
395 
396  std::vector<int8_t> top_rows(row_size * n);
397  thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
398  return top_rows;
399 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
Definition: TopKSort.cu:276
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
Definition: TopKSort.cu:38
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool nulls_first
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
Definition: TopKSort.cu:180
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define checkCudaErrors(err)
Definition: GpuInitGroups.cu:9
constexpr double n
Definition: Utm.h:38
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
Definition: TopKSort.cu:71

+ Here is the call graph for this function:

template<typename DerivedPolicy >
void reset_keys_in_row_buffer ( const thrust::detail::execution_policy_base< DerivedPolicy > &  exec,
int8_t *  row_buffer,
const size_t  key_width,
const size_t  row_size,
const size_t  first,
const size_t  last 
)

Definition at line 276 of file TopKSort.cu.

References CHECK, EMPTY_KEY_32, and EMPTY_KEY_64.

Referenced by pop_n_rows_from_merged_heaps_gpu().

282  {
283  switch (key_width) {
284  case 4:
285  thrust::for_each(
286  exec,
287  thrust::make_counting_iterator(first),
288  thrust::make_counting_iterator(last),
289  KeyReseter<int32_t>(row_buffer, row_size, static_cast<int32_t>(EMPTY_KEY_32)));
290  break;
291  case 8:
292  thrust::for_each(
293  exec,
294  thrust::make_counting_iterator(first),
295  thrust::make_counting_iterator(last),
296  KeyReseter<int64_t>(row_buffer, row_size, static_cast<int64_t>(EMPTY_KEY_64)));
297  break;
298  default:
299  CHECK(false);
300  }
301 }
#define EMPTY_KEY_64
#define CHECK(condition)
Definition: Logger.h:291
#define EMPTY_KEY_32

+ Here is the caller graph for this function:

template<class K , class I >
void sort_indices_by_key ( thrust::device_ptr< I >  d_idx_first,
const size_t  idx_count,
const thrust::device_ptr< K > &  d_key_buffer,
const bool  desc,
ThrustAllocator allocator,
const int  device_id 
)

Definition at line 157 of file TopKSort.cu.

References checkCudaErrors, and getQueryEngineCudaStreamForDevice().

Referenced by do_radix_sort().

162  {
163  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
164  if (desc) {
165  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
166  d_key_buffer,
167  d_key_buffer + idx_count,
168  d_idx_first,
169  thrust::greater<K>());
170  } else {
171  thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
172  d_key_buffer,
173  d_key_buffer + idx_count,
174  d_idx_first);
175  }
176  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
177 }
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define checkCudaErrors(err)
Definition: GpuInitGroups.cu:9

+ Here is the call graph for this function:

+ Here is the caller graph for this function: