OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryExecutionContext Class Reference

#include <QueryExecutionContext.h>

+ Inheritance diagram for QueryExecutionContext:
+ Collaboration diagram for QueryExecutionContext:

Public Member Functions

 QueryExecutionContext (const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, RenderInfo *)
 
ResultSetPtr getRowSet (const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
 
ResultSetPtr groupBufferToResults (const size_t i) const
 
std::vector< int64_t * > launchGpuCode (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
 
std::vector< int64_t * > launchCpuCode (const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void * >> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
 
int64_t getAggInitValForIndex (const size_t index) const
 

Private Member Functions

ResultSetPtr groupBufferToDeinterleavedResults (const size_t i) const
 

Private Attributes

std::unique_ptr< CudaAllocatorgpu_allocator_
 
const QueryMemoryDescriptor query_mem_desc_
 
const Executorexecutor_
 
const ExecutorDeviceType device_type_
 
const ExecutorDispatchMode dispatch_mode_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
const bool output_columnar_
 
std::unique_ptr
< QueryMemoryInitializer
query_buffers_
 
std::unique_ptr< ResultSetestimator_result_set_
 

Friends

class Executor
 
template<typename META_CLASS_TYPE >
class AggregateReductionEgress
 

Detailed Description

Definition at line 35 of file QueryExecutionContext.h.

Constructor & Destructor Documentation

QueryExecutionContext::QueryExecutionContext ( const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc,
const Executor executor,
const ExecutorDeviceType  device_type,
const ExecutorDispatchMode  dispatch_mode,
const int  device_id,
const int64_t  num_rows,
const std::vector< std::vector< const int8_t * >> &  col_buffers,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const bool  output_columnar,
const bool  sort_on_gpu,
RenderInfo render_info 
)

Definition at line 29 of file QueryExecutionContext.cpp.

References CHECK(), GPU, gpu_allocator_, RenderInfo::isPotentialInSituRender(), num_rows, query_buffers_, query_mem_desc, RenderInfo::render_allocator_map_ptr, and sort_on_gpu().

43  : query_mem_desc_(query_mem_desc)
44  , executor_(executor)
45  , device_type_(device_type)
46  , dispatch_mode_(dispatch_mode)
47  , row_set_mem_owner_(row_set_mem_owner)
48  , output_columnar_(output_columnar) {
49  CHECK(executor);
50  auto& data_mgr = executor->catalog_->getDataMgr();
51  if (device_type == ExecutorDeviceType::GPU) {
52  gpu_allocator_ = std::make_unique<CudaAllocator>(&data_mgr, device_id);
53  }
54 
55  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
56  ? render_info->render_allocator_map_ptr.get()
57  : nullptr;
58  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
60  device_id,
61  device_type,
62  dispatch_mode,
63  output_columnar,
65  num_rows,
66  col_buffers,
67  frag_offsets,
68  render_allocator_map,
69  render_info,
70  row_set_mem_owner,
71  gpu_allocator_.get(),
72  executor);
73 }
const int8_t const int64_t * num_rows
const ExecutorDispatchMode dispatch_mode_
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
std::unique_ptr< CudaAllocator > gpu_allocator_
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)
const QueryMemoryDescriptor query_mem_desc_

+ Here is the call graph for this function:

Member Function Documentation

int64_t QueryExecutionContext::getAggInitValForIndex ( const size_t  index) const

Definition at line 126 of file QueryExecutionContext.cpp.

References CHECK(), and query_buffers_.

Referenced by AggregateReductionEgress< META_TYPE_CLASS >::operator()(), and AggregateReductionEgress< Experimental::MetaTypeClass< Experimental::Geometry > >::operator()().

126  {
128  return query_buffers_->getAggInitValForIndex(index);
129 }
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::getRowSet ( const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc 
) const

Definition at line 131 of file QueryExecutionContext.cpp.

References CHECK(), CHECK_EQ, CPU, device_type_, executor_, GPU, groupBufferToResults(), query_buffers_, query_mem_desc_, row_set_mem_owner_, and QueryMemoryDescriptor::threadsShareMemory().

Referenced by Executor::executePlanWithGroupBy().

133  {
134  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
136  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
138  CHECK_EQ(size_t(1), group_by_buffers_size);
139  return groupBufferToResults(0);
140  }
141  size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
142  for (size_t i = 0; i < group_by_buffers_size; i += step) {
143  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
144  }
146  return executor_->reduceMultiDeviceResults(
147  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
148 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
ResultSetPtr groupBufferToResults(const size_t i) const
const QueryMemoryDescriptor query_mem_desc_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::groupBufferToDeinterleavedResults ( const size_t  i) const
private

Definition at line 75 of file QueryExecutionContext.cpp.

References agg_col_count, CHECK(), CPU, executor_, ResultSet::fixupQueryMemoryDescriptor(), QueryMemoryDescriptor::getColOffInBytes(), QueryMemoryDescriptor::getColOffInBytesInNextBin(), QueryMemoryDescriptor::getSlotCount(), output_columnar_, query_buffers_, query_mem_desc_, ResultSetStorage::reduceSingleRow(), and row_set_mem_owner_.

Referenced by groupBufferToResults().

76  {
78  const auto& result_set = query_buffers_->getResultSet(i);
79  auto deinterleaved_query_mem_desc =
81  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
82  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
83 
84  auto deinterleaved_result_set =
85  std::make_shared<ResultSet>(result_set->getTargetInfos(),
86  std::vector<ColumnLazyFetchInfo>{},
87  std::vector<std::vector<const int8_t*>>{},
88  std::vector<std::vector<int64_t>>{},
89  std::vector<int64_t>{},
91  -1,
92  deinterleaved_query_mem_desc,
94  executor_);
95  auto deinterleaved_storage =
96  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
97  auto deinterleaved_buffer =
98  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
99  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
100  size_t deinterleaved_buffer_idx = 0;
102  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
103  bin_idx < result_set->entryCount();
104  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
105  std::vector<int64_t> agg_vals(agg_col_count, 0);
106  memcpy(&agg_vals[0],
107  &executor_->plan_state_->init_agg_vals_[0],
108  agg_col_count * sizeof(agg_vals[0]));
109  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
110  executor_->warpSize(),
111  false,
112  true,
113  agg_vals,
115  result_set->getTargetInfos(),
116  executor_->plan_state_->init_agg_vals_);
117  for (size_t agg_idx = 0; agg_idx < agg_col_count;
118  ++agg_idx, ++deinterleaved_buffer_idx) {
119  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
120  }
121  }
122  query_buffers_->resetResultSet(i);
123  return deinterleaved_result_set;
124 }
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t getColOffInBytesInNextBin(const size_t col_idx) const
const QueryMemoryDescriptor query_mem_desc_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::groupBufferToResults ( const size_t  i) const

Definition at line 150 of file QueryExecutionContext.cpp.

References device_type_, groupBufferToDeinterleavedResults(), QueryMemoryDescriptor::interleavedBins(), query_buffers_, and query_mem_desc_.

Referenced by getRowSet().

150  {
153  }
154  return query_buffers_->getResultSetOwned(i);
155 }
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
bool interleavedBins(const ExecutorDeviceType) const
const QueryMemoryDescriptor query_mem_desc_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< int64_t * > QueryExecutionContext::launchCpuCode ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< std::pair< void *, void * >> &  fn_ptrs,
const bool  hoist_literals,
const std::vector< int8_t > &  literal_buff,
std::vector< std::vector< const int8_t * >>  col_buffers,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_row_offsets,
const int32_t  scan_limit,
int32_t *  error_code,
const uint32_t  num_tables,
const std::vector< int64_t > &  join_hash_tables 
)

Definition at line 542 of file QueryExecutionContext.cpp.

References align_to_int64(), CHECK(), CHECK_EQ, compact_init_vals(), CPU, DEBUG_TIMER, QueryMemoryDescriptor::didOutputColumnar(), error_code, RelAlgExecutionUnit::estimator, estimator_result_set_, QueryMemoryDescriptor::getColsSize(), QueryMemoryDescriptor::getQueryDescriptionType(), INJECT_TIMER, QueryMemoryDescriptor::isGroupBy(), num_rows, Projection, query_buffers_, query_mem_desc_, and use_streaming_top_n().

Referenced by Executor::executePlanWithGroupBy(), and Executor::executePlanWithoutGroupBy().

553  {
554  auto timer = DEBUG_TIMER(__func__);
555  INJECT_TIMER(lauchCpuCode);
556 
558  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
559 
560  std::vector<const int8_t**> multifrag_col_buffers;
561  for (auto& col_buffer : col_buffers) {
562  multifrag_col_buffers.push_back(&col_buffer[0]);
563  }
564  const int8_t*** multifrag_cols_ptr{
565  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
566  const uint64_t num_fragments =
567  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
568  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
569 
570  const bool is_group_by{query_mem_desc_.isGroupBy()};
571  std::vector<int64_t*> out_vec;
572  if (ra_exe_unit.estimator) {
573  estimator_result_set_.reset(
574  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
575  out_vec.push_back(
576  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
577  } else {
578  if (!is_group_by) {
579  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
580  auto buff = new int64_t[num_out_frags];
581  out_vec.push_back(static_cast<int64_t*>(buff));
582  }
583  }
584  }
585 
586  CHECK_EQ(num_rows.size(), col_buffers.size());
587  std::vector<int64_t> flatened_num_rows;
588  for (auto& nums : num_rows) {
589  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
590  }
591  std::vector<uint64_t> flatened_frag_offsets;
592  for (auto& offsets : frag_offsets) {
593  flatened_frag_offsets.insert(
594  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
595  }
596  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
597  auto num_rows_ptr =
598  rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0];
599  int32_t total_matched_init{0};
600 
601  std::vector<int64_t> cmpt_val_buff;
602  if (is_group_by) {
603  cmpt_val_buff =
605  init_agg_vals,
607  }
608 
609  const int64_t* join_hash_tables_ptr =
610  join_hash_tables.size() == 1
611  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
612  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
613  if (hoist_literals) {
614  using agg_query = void (*)(const int8_t***, // col_buffers
615  const uint64_t*, // num_fragments
616  const int8_t*, // literals
617  const int64_t*, // num_rows
618  const uint64_t*, // frag_row_offsets
619  const int32_t*, // max_matched
620  int32_t*, // total_matched
621  const int64_t*, // init_agg_value
622  int64_t**, // out
623  int32_t*, // error_code
624  const uint32_t*, // num_tables
625  const int64_t*); // join_hash_tables_ptr
626  if (is_group_by) {
627  reinterpret_cast<agg_query>(fn_ptrs[0].first)(
628  multifrag_cols_ptr,
629  &num_fragments,
630  &literal_buff[0],
631  num_rows_ptr,
632  &flatened_frag_offsets[0],
633  &scan_limit,
634  &total_matched_init,
635  &cmpt_val_buff[0],
636  query_buffers_->getGroupByBuffersPtr(),
637  error_code,
638  &num_tables,
639  join_hash_tables_ptr);
640  } else {
641  reinterpret_cast<agg_query>(fn_ptrs[0].first)(multifrag_cols_ptr,
642  &num_fragments,
643  &literal_buff[0],
644  num_rows_ptr,
645  &flatened_frag_offsets[0],
646  &scan_limit,
647  &total_matched_init,
648  &init_agg_vals[0],
649  &out_vec[0],
650  error_code,
651  &num_tables,
652  join_hash_tables_ptr);
653  }
654  } else {
655  using agg_query = void (*)(const int8_t***, // col_buffers
656  const uint64_t*, // num_fragments
657  const int64_t*, // num_rows
658  const uint64_t*, // frag_row_offsets
659  const int32_t*, // max_matched
660  int32_t*, // total_matched
661  const int64_t*, // init_agg_value
662  int64_t**, // out
663  int32_t*, // error_code
664  const uint32_t*, // num_tables
665  const int64_t*); // join_hash_tables_ptr
666  if (is_group_by) {
667  reinterpret_cast<agg_query>(fn_ptrs[0].first)(
668  multifrag_cols_ptr,
669  &num_fragments,
670  num_rows_ptr,
671  &flatened_frag_offsets[0],
672  &scan_limit,
673  &total_matched_init,
674  &cmpt_val_buff[0],
675  query_buffers_->getGroupByBuffersPtr(),
676  error_code,
677  &num_tables,
678  join_hash_tables_ptr);
679  } else {
680  reinterpret_cast<agg_query>(fn_ptrs[0].first)(multifrag_cols_ptr,
681  &num_fragments,
682  num_rows_ptr,
683  &flatened_frag_offsets[0],
684  &scan_limit,
685  &total_matched_init,
686  &init_agg_vals[0],
687  &out_vec[0],
688  error_code,
689  &num_tables,
690  join_hash_tables_ptr);
691  }
692  }
693 
694  if (ra_exe_unit.estimator) {
695  return {};
696  }
697 
698  if (rowid_lookup_num_rows && *error_code < 0) {
699  *error_code = 0;
700  }
701 
703  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
704  }
705 
708  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
709  }
710 
711  return out_vec;
712 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
bool use_streaming_top_n(const RelAlgExecutionUnit &ra_exe_unit, const bool output_columnar)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
const int8_t const int64_t * num_rows
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)
#define INJECT_TIMER(DESC)
Definition: measure.h:91
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
#define DEBUG_TIMER(name)
Definition: Logger.h:296
std::unique_ptr< ResultSet > estimator_result_set_
const QueryMemoryDescriptor query_mem_desc_
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< int64_t * > QueryExecutionContext::launchGpuCode ( const RelAlgExecutionUnit ra_exe_unit,
const std::vector< std::pair< void *, void * >> &  cu_functions,
const bool  hoist_literals,
const std::vector< int8_t > &  literal_buff,
std::vector< std::vector< const int8_t * >>  col_buffers,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_row_offsets,
const int32_t  scan_limit,
Data_Namespace::DataMgr data_mgr,
const unsigned  block_size_x,
const unsigned  grid_size_x,
const int  device_id,
int32_t *  error_code,
const uint32_t  num_tables,
const std::vector< int64_t > &  join_hash_tables,
RenderAllocatorMap render_allocator_map 
)

Definition at line 178 of file QueryExecutionContext.cpp.

References agg_col_count, CHECK(), CHECK_EQ, checkCudaErrors(), copy_from_gpu(), copy_to_gpu(), DEBUG_TIMER, QueryMemoryDescriptor::didOutputColumnar(), dispatch_mode_, RelAlgExecutionUnit::estimator, estimator_result_set_, executor_, g_enable_dynamic_watchdog, get_num_allocated_rows_from_gpu(), QueryMemoryDescriptor::getEntryCount(), QueryMemoryDescriptor::getQueryDescriptionType(), RenderAllocatorMap::getRenderAllocator(), GPU, gpu_allocator_, QueryMemoryDescriptor::hasKeylessHash(), INJECT_TIMER, inplace_sort_gpu(), QueryMemoryDescriptor::isGroupBy(), max_matched, num_rows, SortInfo::order_entries, output_columnar_, Projection, query_buffers_, query_mem_desc_, QueryMemoryDescriptor::sharedMemBytes(), RelAlgExecutionUnit::sort_info, QueryMemoryDescriptor::sortOnGpu(), to_string(), RelAlgExecutionUnit::use_bump_allocator, use_speculative_top_n(), use_streaming_top_n(), and VLOG.

Referenced by Executor::executePlanWithGroupBy(), and Executor::executePlanWithoutGroupBy().

194  {
195  auto timer = DEBUG_TIMER(__func__);
196  INJECT_TIMER(lauchGpuCode);
197 #ifdef HAVE_CUDA
200  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
201 
202  bool is_group_by{query_mem_desc_.isGroupBy()};
203 
204  RenderAllocator* render_allocator = nullptr;
205  if (render_allocator_map) {
206  render_allocator = render_allocator_map->getRenderAllocator(device_id);
207  }
208 
209  auto cu_func = static_cast<CUfunction>(cu_functions[device_id].first);
210  std::vector<int64_t*> out_vec;
211  uint32_t num_fragments = col_buffers.size();
212  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
213 
214  CUevent start0, stop0; // preparation
215  cuEventCreate(&start0, 0);
216  cuEventCreate(&stop0, 0);
217  CUevent start1, stop1; // cuLaunchKernel
218  cuEventCreate(&start1, 0);
219  cuEventCreate(&stop1, 0);
220  CUevent start2, stop2; // finish
221  cuEventCreate(&start2, 0);
222  cuEventCreate(&stop2, 0);
223 
225  cuEventRecord(start0, 0);
226  }
227 
229  initializeDynamicWatchdog(cu_functions[device_id].second, device_id);
230  }
231 
232  auto kernel_params = prepareKernelParams(col_buffers,
233  literal_buff,
234  num_rows,
235  frag_offsets,
236  scan_limit,
237  init_agg_vals,
238  error_codes,
239  num_tables,
241  data_mgr,
242  device_id,
243  hoist_literals,
244  is_group_by);
245 
246  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
247  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
248 
249  const unsigned block_size_y = 1;
250  const unsigned block_size_z = 1;
251  const unsigned grid_size_y = 1;
252  const unsigned grid_size_z = 1;
253  const auto total_thread_count = block_size_x * grid_size_x;
254  const auto err_desc = kernel_params[ERROR_CODE];
255 
256  if (is_group_by) {
257  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
258  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
259  auto gpu_group_by_buffers =
260  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
262  kernel_params[INIT_AGG_VALS],
263  device_id,
265  block_size_x,
266  grid_size_x,
267  executor_->warpSize(),
268  can_sort_on_gpu,
270  render_allocator);
271  if (ra_exe_unit.use_bump_allocator) {
272  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
273  copy_to_gpu(data_mgr,
274  kernel_params[MAX_MATCHED],
275  &max_matched,
276  sizeof(max_matched),
277  device_id);
278  }
279 
280  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.first;
281  std::vector<void*> param_ptrs;
282  for (auto& param : kernel_params) {
283  param_ptrs.push_back(&param);
284  }
285 
287  cuEventRecord(stop0, 0);
288  cuEventSynchronize(stop0);
289  float milliseconds0 = 0;
290  cuEventElapsedTime(&milliseconds0, start0, stop0);
291  VLOG(1) << "Device " << std::to_string(device_id)
292  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
293  << " ms";
294  cuEventRecord(start1, 0);
295  }
296 
297  if (hoist_literals) {
299  cuLaunchKernel(cu_func,
300  grid_size_x,
301  grid_size_y,
302  grid_size_z,
303  block_size_x,
304  block_size_y,
305  block_size_z,
307  nullptr,
308  &param_ptrs[0],
309  nullptr));
310  } else {
311  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
313  cuLaunchKernel(cu_func,
314  grid_size_x,
315  grid_size_y,
316  grid_size_z,
317  block_size_x,
318  block_size_y,
319  block_size_z,
321  nullptr,
322  &param_ptrs[0],
323  nullptr));
324  }
326  executor_->registerActiveModule(cu_functions[device_id].second, device_id);
327  cuEventRecord(stop1, 0);
328  cuEventSynchronize(stop1);
329  executor_->unregisterActiveModule(cu_functions[device_id].second, device_id);
330  float milliseconds1 = 0;
331  cuEventElapsedTime(&milliseconds1, start1, stop1);
332  VLOG(1) << "Device " << std::to_string(device_id)
333  << ": launchGpuCode: group-by cuLaunchKernel: "
334  << std::to_string(milliseconds1) << " ms";
335  cuEventRecord(start2, 0);
336  }
337 
338  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
339  reinterpret_cast<int8_t*>(err_desc),
340  error_codes.size() * sizeof(error_codes[0]));
341  *error_code = aggregate_error_codes(error_codes);
342  if (*error_code > 0) {
343  return {};
344  }
345 
346  if (!render_allocator) {
348  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
350  gpu_group_by_buffers,
351  ra_exe_unit,
352  total_thread_count,
353  device_id);
354  } else {
355  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
358  gpu_group_by_buffers,
359  data_mgr,
360  device_id);
361  }
365  query_buffers_->compactProjectionBuffersGpu(
367  data_mgr,
368  gpu_group_by_buffers,
370  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
371  device_id);
372  } else {
373  size_t num_allocated_rows{0};
374  if (ra_exe_unit.use_bump_allocator) {
375  num_allocated_rows = get_num_allocated_rows_from_gpu(
376  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
377  // First, check the error code. If we ran out of slots, don't copy data back
378  // into the ResultSet or update ResultSet entry count
379  if (*error_code < 0) {
380  return {};
381  }
382  }
383  query_buffers_->copyGroupByBuffersFromGpu(
384  data_mgr,
386  ra_exe_unit.use_bump_allocator ? num_allocated_rows
388  gpu_group_by_buffers,
389  &ra_exe_unit,
390  block_size_x,
391  grid_size_x,
392  device_id,
393  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
394  if (num_allocated_rows) {
395  CHECK(ra_exe_unit.use_bump_allocator);
396  CHECK(!query_buffers_->result_sets_.empty());
397  query_buffers_->result_sets_.front()->updateStorageEntryCount(
398  num_allocated_rows);
399  }
400  }
401  } else {
402  query_buffers_->copyGroupByBuffersFromGpu(
403  data_mgr,
406  gpu_group_by_buffers,
407  &ra_exe_unit,
408  block_size_x,
409  grid_size_x,
410  device_id,
411  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
412  }
413  }
414  }
415  } else {
416  std::vector<CUdeviceptr> out_vec_dev_buffers;
417  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
418  if (ra_exe_unit.estimator) {
419  estimator_result_set_.reset(new ResultSet(
420  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
421  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
422  estimator_result_set_->getDeviceEstimatorBuffer()));
423  } else {
424  for (size_t i = 0; i < agg_col_count; ++i) {
425  CUdeviceptr out_vec_dev_buffer =
426  num_fragments
427  ? reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(
428  block_size_x * grid_size_x * sizeof(int64_t) * num_fragments))
429  : 0;
430  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
431  }
432  }
433  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
434  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
435  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
436  agg_col_count * sizeof(CUdeviceptr));
437  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
438  std::vector<void*> param_ptrs;
439  for (auto& param : kernel_params) {
440  param_ptrs.push_back(&param);
441  }
442 
444  cuEventRecord(stop0, 0);
445  cuEventSynchronize(stop0);
446  float milliseconds0 = 0;
447  cuEventElapsedTime(&milliseconds0, start0, stop0);
448  VLOG(1) << "Device " << std::to_string(device_id)
449  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
450  cuEventRecord(start1, 0);
451  }
452 
453  if (hoist_literals) {
454  checkCudaErrors(cuLaunchKernel(cu_func,
455  grid_size_x,
456  grid_size_y,
457  grid_size_z,
458  block_size_x,
459  block_size_y,
460  block_size_z,
461  0,
462  nullptr,
463  &param_ptrs[0],
464  nullptr));
465  } else {
466  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
467  checkCudaErrors(cuLaunchKernel(cu_func,
468  grid_size_x,
469  grid_size_y,
470  grid_size_z,
471  block_size_x,
472  block_size_y,
473  block_size_z,
474  0,
475  nullptr,
476  &param_ptrs[0],
477  nullptr));
478  }
479 
481  executor_->registerActiveModule(cu_functions[device_id].second, device_id);
482  cuEventRecord(stop1, 0);
483  cuEventSynchronize(stop1);
484  executor_->unregisterActiveModule(cu_functions[device_id].second, device_id);
485  float milliseconds1 = 0;
486  cuEventElapsedTime(&milliseconds1, start1, stop1);
487  VLOG(1) << "Device " << std::to_string(device_id)
488  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
489  << " ms";
490  cuEventRecord(start2, 0);
491  }
492 
493  copy_from_gpu(data_mgr,
494  &error_codes[0],
495  err_desc,
496  error_codes.size() * sizeof(error_codes[0]),
497  device_id);
498  *error_code = aggregate_error_codes(error_codes);
499  if (*error_code > 0) {
500  return {};
501  }
502  if (ra_exe_unit.estimator) {
504  estimator_result_set_->syncEstimatorBuffer();
505  return {};
506  }
507  for (size_t i = 0; i < agg_col_count; ++i) {
508  int64_t* host_out_vec =
509  new int64_t[block_size_x * grid_size_x * sizeof(int64_t) * num_fragments];
510  copy_from_gpu(data_mgr,
511  host_out_vec,
512  out_vec_dev_buffers[i],
513  block_size_x * grid_size_x * sizeof(int64_t) * num_fragments,
514  device_id);
515  out_vec.push_back(host_out_vec);
516  }
517  }
518  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
519  if (count_distinct_bitmap_mem) {
520  copy_from_gpu(data_mgr,
521  query_buffers_->getCountDistinctHostPtr(),
522  count_distinct_bitmap_mem,
523  query_buffers_->getCountDistinctBitmapBytes(),
524  device_id);
525  }
526 
528  cuEventRecord(stop2, 0);
529  cuEventSynchronize(stop2);
530  float milliseconds2 = 0;
531  cuEventElapsedTime(&milliseconds2, start2, stop2);
532  VLOG(1) << "Device " << std::to_string(device_id)
533  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
534  }
535 
536  return out_vec;
537 #else
538  return {};
539 #endif
540 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
RenderAllocator * getRenderAllocator(size_t device_id)
bool use_streaming_top_n(const RelAlgExecutionUnit &ra_exe_unit, const bool output_columnar)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
const int8_t const int64_t * num_rows
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
std::string to_string(char const *&&v)
std::unique_ptr< QueryMemoryInitializer > query_buffers_
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
const SortInfo sort_info
#define INJECT_TIMER(DESC)
Definition: measure.h:91
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
const int8_t const int64_t const uint64_t const int32_t * max_matched
QueryDescriptionType getQueryDescriptionType() const
size_t sharedMemBytes(const ExecutorDeviceType) const
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::unique_ptr< CudaAllocator > gpu_allocator_
#define DEBUG_TIMER(name)
Definition: Logger.h:296
std::unique_ptr< ResultSet > estimator_result_set_
const QueryMemoryDescriptor query_mem_desc_
#define VLOG(n)
Definition: Logger.h:280

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

template<typename META_CLASS_TYPE >
friend class AggregateReductionEgress
friend

Definition at line 144 of file QueryExecutionContext.h.

friend class Executor
friend

Definition at line 140 of file QueryExecutionContext.h.

Member Data Documentation

const ExecutorDeviceType QueryExecutionContext::device_type_
private

Definition at line 133 of file QueryExecutionContext.h.

Referenced by getRowSet(), and groupBufferToResults().

const ExecutorDispatchMode QueryExecutionContext::dispatch_mode_
private

Definition at line 134 of file QueryExecutionContext.h.

Referenced by launchGpuCode().

std::unique_ptr<ResultSet> QueryExecutionContext::estimator_result_set_
mutableprivate
const Executor* QueryExecutionContext::executor_
private
std::unique_ptr<CudaAllocator> QueryExecutionContext::gpu_allocator_
private

Definition at line 128 of file QueryExecutionContext.h.

Referenced by launchGpuCode(), and QueryExecutionContext().

const bool QueryExecutionContext::output_columnar_
private

Definition at line 136 of file QueryExecutionContext.h.

Referenced by groupBufferToDeinterleavedResults(), and launchGpuCode().

std::shared_ptr<RowSetMemoryOwner> QueryExecutionContext::row_set_mem_owner_
private

Definition at line 135 of file QueryExecutionContext.h.

Referenced by getRowSet(), and groupBufferToDeinterleavedResults().


The documentation for this class was generated from the following files: