OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryExecutionContext Class Reference

#include <QueryExecutionContext.h>

+ Inheritance diagram for QueryExecutionContext:
+ Collaboration diagram for QueryExecutionContext:

Public Member Functions

 QueryExecutionContext (const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
 
ResultSetPtr getRowSet (const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
 
ResultSetPtr groupBufferToResults (const size_t i) const
 
std::vector< int64_t * > launchGpuCode (const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
 
std::vector< int64_t * > launchCpuCode (const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, const int64_t num_rows_to_process=-1)
 
int64_t getAggInitValForIndex (const size_t index) const
 

Private Member Functions

ResultSetPtr groupBufferToDeinterleavedResults (const size_t i) const
 

Private Attributes

std::unique_ptr< CudaAllocatorgpu_allocator_
 
QueryMemoryDescriptor query_mem_desc_
 
const Executorexecutor_
 
const ExecutorDeviceType device_type_
 
const ExecutorDispatchMode dispatch_mode_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
const bool output_columnar_
 
std::unique_ptr
< QueryMemoryInitializer
query_buffers_
 
std::unique_ptr< ResultSetestimator_result_set_
 

Friends

class Executor
 

Detailed Description

Definition at line 38 of file QueryExecutionContext.h.

Constructor & Destructor Documentation

QueryExecutionContext::QueryExecutionContext ( const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc,
const Executor executor,
const ExecutorDeviceType  device_type,
const ExecutorDispatchMode  dispatch_mode,
const int  device_id,
const int64_t  num_rows,
const std::vector< std::vector< const int8_t * >> &  col_buffers,
const std::vector< std::vector< uint64_t >> &  frag_offsets,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const bool  output_columnar,
const bool  sort_on_gpu,
const size_t  thread_idx,
RenderInfo render_info 
)

Definition at line 30 of file QueryExecutionContext.cpp.

References CHECK, GPU, gpu_allocator_, RenderInfo::isPotentialInSituRender(), query_buffers_, query_mem_desc, RenderInfo::render_allocator_map_ptr, and sort_on_gpu().

45  : query_mem_desc_(query_mem_desc)
46  , executor_(executor)
47  , device_type_(device_type)
48  , dispatch_mode_(dispatch_mode)
49  , row_set_mem_owner_(row_set_mem_owner)
50  , output_columnar_(output_columnar) {
51  CHECK(executor);
52  auto data_mgr = executor->getDataMgr();
53  if (device_type == ExecutorDeviceType::GPU) {
54  gpu_allocator_ = std::make_unique<CudaAllocator>(data_mgr, device_id);
55  }
56 
57  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
58  ? render_info->render_allocator_map_ptr.get()
59  : nullptr;
60  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
62  device_id,
63  device_type,
64  dispatch_mode,
65  output_columnar,
67  num_rows,
68  col_buffers,
69  frag_offsets,
70  render_allocator_map,
71  render_info,
72  row_set_mem_owner,
73  gpu_allocator_.get(),
74  thread_idx,
75  executor);
76 }
const ExecutorDispatchMode dispatch_mode_
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
QueryMemoryDescriptor query_mem_desc_
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
std::unique_ptr< CudaAllocator > gpu_allocator_
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:63
#define CHECK(condition)
Definition: Logger.h:209
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)

+ Here is the call graph for this function:

Member Function Documentation

int64_t QueryExecutionContext::getAggInitValForIndex ( const size_t  index) const

Definition at line 147 of file QueryExecutionContext.cpp.

References CHECK, and query_buffers_.

Referenced by Executor::executePlanWithoutGroupBy().

147  {
149  return query_buffers_->getAggInitValForIndex(index);
150 }
std::unique_ptr< QueryMemoryInitializer > query_buffers_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::getRowSet ( const RelAlgExecutionUnit ra_exe_unit,
const QueryMemoryDescriptor query_mem_desc 
) const

Definition at line 152 of file QueryExecutionContext.cpp.

References CHECK, CHECK_EQ, CPU, DEBUG_TIMER, device_type_, executor_, GPU, groupBufferToResults(), QueryMemoryDescriptor::hasVarlenOutput(), i, query_buffers_, query_mem_desc_, row_set_mem_owner_, and QueryMemoryDescriptor::threadsShareMemory().

Referenced by Executor::executePlanWithGroupBy().

154  {
155  auto timer = DEBUG_TIMER(__func__);
156  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
158  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
160  const size_t expected_num_buffers = query_mem_desc.hasVarlenOutput() ? 2 : 1;
161  CHECK_EQ(expected_num_buffers, group_by_buffers_size);
162  return groupBufferToResults(0);
163  }
164  const size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
165  const size_t group_by_output_buffers_size =
166  group_by_buffers_size - (query_mem_desc.hasVarlenOutput() ? 1 : 0);
167  for (size_t i = 0; i < group_by_output_buffers_size; i += step) {
168  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
169  }
171  return executor_->reduceMultiDeviceResults(
172  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
173 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
QueryMemoryDescriptor query_mem_desc_
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
ResultSetPtr groupBufferToResults(const size_t i) const
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::groupBufferToDeinterleavedResults ( const size_t  i) const
private

Definition at line 78 of file QueryExecutionContext.cpp.

References CHECK, CPU, executor_, ResultSet::fixupQueryMemoryDescriptor(), g_enable_non_kernel_time_query_interrupt, QueryMemoryDescriptor::getColOffInBytes(), QueryMemoryDescriptor::getColOffInBytesInNextBin(), QueryMemoryDescriptor::getSlotCount(), output_columnar_, query_buffers_, query_mem_desc_, ResultSetStorage::reduceSingleRow(), row_set_mem_owner_, and UNLIKELY.

Referenced by groupBufferToResults().

79  {
81  const auto& result_set = query_buffers_->getResultSet(i);
82  auto deinterleaved_query_mem_desc =
84  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
85  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
86 
87  auto deinterleaved_result_set =
88  std::make_shared<ResultSet>(result_set->getTargetInfos(),
89  std::vector<ColumnLazyFetchInfo>{},
90  std::vector<std::vector<const int8_t*>>{},
91  std::vector<std::vector<int64_t>>{},
92  std::vector<int64_t>{},
94  -1,
95  deinterleaved_query_mem_desc,
97  executor_->getCatalog(),
98  executor_->blockSize(),
99  executor_->gridSize());
100  auto deinterleaved_storage =
101  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
102  auto deinterleaved_buffer =
103  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
104  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
105  size_t deinterleaved_buffer_idx = 0;
106  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
107  auto do_work = [&](const size_t bin_base_off) {
108  std::vector<int64_t> agg_vals(agg_col_count, 0);
109  memcpy(&agg_vals[0],
110  &executor_->plan_state_->init_agg_vals_[0],
111  agg_col_count * sizeof(agg_vals[0]));
112  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
113  executor_->warpSize(),
114  false,
115  true,
116  agg_vals,
118  result_set->getTargetInfos(),
119  executor_->plan_state_->init_agg_vals_);
120  for (size_t agg_idx = 0; agg_idx < agg_col_count;
121  ++agg_idx, ++deinterleaved_buffer_idx) {
122  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
123  }
124  };
126  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
127  bin_idx < result_set->entryCount();
128  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
129  if (UNLIKELY((bin_idx & 0xFFFF) == 0 &&
130  executor_->checkNonKernelTimeInterrupted())) {
131  throw std::runtime_error(
132  "Query execution has interrupted during result set reduction");
133  }
134  do_work(bin_base_off);
135  }
136  } else {
137  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
138  bin_idx < result_set->entryCount();
139  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
140  do_work(bin_base_off);
141  }
142  }
143  query_buffers_->resetResultSet(i);
144  return deinterleaved_result_set;
145 }
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
std::unique_ptr< QueryMemoryInitializer > query_buffers_
QueryMemoryDescriptor query_mem_desc_
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:493
#define CHECK(condition)
Definition: Logger.h:209
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr QueryExecutionContext::groupBufferToResults ( const size_t  i) const

Definition at line 175 of file QueryExecutionContext.cpp.

References device_type_, groupBufferToDeinterleavedResults(), QueryMemoryDescriptor::interleavedBins(), query_buffers_, and query_mem_desc_.

Referenced by getRowSet().

175  {
178  }
179  return query_buffers_->getResultSetOwned(i);
180 }
const ExecutorDeviceType device_type_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
QueryMemoryDescriptor query_mem_desc_
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
bool interleavedBins(const ExecutorDeviceType) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< int64_t * > QueryExecutionContext::launchCpuCode ( const RelAlgExecutionUnit ra_exe_unit,
const CpuCompilationContext fn_ptrs,
const bool  hoist_literals,
const std::vector< int8_t > &  literal_buff,
std::vector< std::vector< const int8_t * >>  col_buffers,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_row_offsets,
const int32_t  scan_limit,
int32_t *  error_code,
const uint32_t  num_tables,
const std::vector< int64_t > &  join_hash_tables,
const int64_t  num_rows_to_process = -1 
)

Definition at line 600 of file QueryExecutionContext.cpp.

References align_to_int64(), CHECK, CHECK_EQ, compact_init_vals(), CPU, DEBUG_TIMER, QueryMemoryDescriptor::didOutputColumnar(), RelAlgExecutionUnit::estimator, estimator_result_set_, CpuCompilationContext::func(), QueryMemoryDescriptor::getColsSize(), QueryMemoryDescriptor::getQueryDescriptionType(), i, INJECT_TIMER, QueryMemoryDescriptor::isGroupBy(), Projection, query_buffers_, query_mem_desc_, and QueryMemoryDescriptor::useStreamingTopN().

Referenced by Executor::executePlanWithGroupBy(), and Executor::executePlanWithoutGroupBy().

612  {
613  auto timer = DEBUG_TIMER(__func__);
614  INJECT_TIMER(lauchCpuCode);
615 
617  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
618 
619  std::vector<const int8_t**> multifrag_col_buffers;
620  for (auto& col_buffer : col_buffers) {
621  multifrag_col_buffers.push_back(col_buffer.empty() ? nullptr : col_buffer.data());
622  }
623  const int8_t*** multifrag_cols_ptr{
624  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
625  const uint64_t num_fragments =
626  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
627  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
628 
629  const bool is_group_by{query_mem_desc_.isGroupBy()};
630  std::vector<int64_t*> out_vec;
631  if (ra_exe_unit.estimator) {
632  // Subfragments collect the result from multiple runs in a single
633  // result set.
634  if (!estimator_result_set_) {
635  estimator_result_set_.reset(
636  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
637  }
638  out_vec.push_back(
639  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
640  } else {
641  if (!is_group_by) {
642  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
643  auto buff = new int64_t[num_out_frags];
644  out_vec.push_back(static_cast<int64_t*>(buff));
645  }
646  }
647  }
648 
649  CHECK_EQ(num_rows.size(), col_buffers.size());
650  std::vector<int64_t> flatened_num_rows;
651  for (auto& nums : num_rows) {
652  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
653  }
654  std::vector<uint64_t> flatened_frag_offsets;
655  for (auto& offsets : frag_offsets) {
656  flatened_frag_offsets.insert(
657  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
658  }
659  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
660  int64_t* num_rows_ptr;
661  if (num_rows_to_process > 0) {
662  flatened_num_rows[0] = num_rows_to_process;
663  num_rows_ptr = flatened_num_rows.data();
664  } else {
665  num_rows_ptr =
666  rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data();
667  }
668  int32_t total_matched_init{0};
669 
670  std::vector<int64_t> cmpt_val_buff;
671  if (is_group_by) {
672  cmpt_val_buff =
674  init_agg_vals,
676  }
677 
678  CHECK(native_code);
679  const int64_t* join_hash_tables_ptr =
680  join_hash_tables.size() == 1
681  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
682  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
683  if (hoist_literals) {
684  using agg_query = void (*)(const int8_t***, // col_buffers
685  const uint64_t*, // num_fragments
686  const int8_t*, // literals
687  const int64_t*, // num_rows
688  const uint64_t*, // frag_row_offsets
689  const int32_t*, // max_matched
690  int32_t*, // total_matched
691  const int64_t*, // init_agg_value
692  int64_t**, // out
693  int32_t*, // error_code
694  const uint32_t*, // num_tables
695  const int64_t*); // join_hash_tables_ptr
696  if (is_group_by) {
697  reinterpret_cast<agg_query>(native_code->func())(
698  multifrag_cols_ptr,
699  &num_fragments,
700  literal_buff.data(),
701  num_rows_ptr,
702  flatened_frag_offsets.data(),
703  &scan_limit,
704  &total_matched_init,
705  cmpt_val_buff.data(),
706  query_buffers_->getGroupByBuffersPtr(),
707  error_code,
708  &num_tables,
709  join_hash_tables_ptr);
710  } else {
711  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
712  &num_fragments,
713  literal_buff.data(),
714  num_rows_ptr,
715  flatened_frag_offsets.data(),
716  &scan_limit,
717  &total_matched_init,
718  init_agg_vals.data(),
719  out_vec.data(),
720  error_code,
721  &num_tables,
722  join_hash_tables_ptr);
723  }
724  } else {
725  using agg_query = void (*)(const int8_t***, // col_buffers
726  const uint64_t*, // num_fragments
727  const int64_t*, // num_rows
728  const uint64_t*, // frag_row_offsets
729  const int32_t*, // max_matched
730  int32_t*, // total_matched
731  const int64_t*, // init_agg_value
732  int64_t**, // out
733  int32_t*, // error_code
734  const uint32_t*, // num_tables
735  const int64_t*); // join_hash_tables_ptr
736  if (is_group_by) {
737  reinterpret_cast<agg_query>(native_code->func())(
738  multifrag_cols_ptr,
739  &num_fragments,
740  num_rows_ptr,
741  flatened_frag_offsets.data(),
742  &scan_limit,
743  &total_matched_init,
744  cmpt_val_buff.data(),
745  query_buffers_->getGroupByBuffersPtr(),
746  error_code,
747  &num_tables,
748  join_hash_tables_ptr);
749  } else {
750  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
751  &num_fragments,
752  num_rows_ptr,
753  flatened_frag_offsets.data(),
754  &scan_limit,
755  &total_matched_init,
756  init_agg_vals.data(),
757  out_vec.data(),
758  error_code,
759  &num_tables,
760  join_hash_tables_ptr);
761  }
762  }
763 
764  if (ra_exe_unit.estimator) {
765  return {};
766  }
767 
768  if (rowid_lookup_num_rows && *error_code < 0) {
769  *error_code = 0;
770  }
771 
773  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
774  }
775 
778  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
779  }
780  return out_vec;
781 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::unique_ptr< QueryMemoryInitializer > query_buffers_
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
std::unique_ptr< ResultSet > estimator_result_set_
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< int64_t * > QueryExecutionContext::launchGpuCode ( const RelAlgExecutionUnit ra_exe_unit,
const GpuCompilationContext cu_functions,
const bool  hoist_literals,
const std::vector< int8_t > &  literal_buff,
std::vector< std::vector< const int8_t * >>  col_buffers,
const std::vector< std::vector< int64_t >> &  num_rows,
const std::vector< std::vector< uint64_t >> &  frag_row_offsets,
const int32_t  scan_limit,
Data_Namespace::DataMgr data_mgr,
const unsigned  block_size_x,
const unsigned  grid_size_x,
const int  device_id,
const size_t  shared_memory_size,
int32_t *  error_code,
const uint32_t  num_tables,
const bool  allow_runtime_interrupt,
const std::vector< int64_t > &  join_hash_tables,
RenderAllocatorMap render_allocator_map 
)

Definition at line 203 of file QueryExecutionContext.cpp.

References CHECK, CHECK_EQ, checkCudaErrors(), copy_from_gpu(), copy_to_gpu(), DEBUG_TIMER, QueryMemoryDescriptor::didOutputColumnar(), dispatch_mode_, RelAlgExecutionUnit::estimator, estimator_result_set_, executor_, g_enable_dynamic_watchdog, get_num_allocated_rows_from_gpu(), QueryMemoryDescriptor::getEntryCount(), GpuCompilationContext::getNativeCode(), QueryMemoryDescriptor::getQueryDescriptionType(), RenderAllocatorMap::getRenderAllocator(), GPU, gpu_allocator_, QueryMemoryDescriptor::hasKeylessHash(), i, INJECT_TIMER, inplace_sort_gpu(), QueryMemoryDescriptor::isGroupBy(), SortInfo::order_entries, output_columnar_, Projection, query_buffers_, query_mem_desc_, RelAlgExecutionUnit::sort_info, QueryMemoryDescriptor::sortOnGpu(), to_string(), RelAlgExecutionUnit::use_bump_allocator, use_speculative_top_n(), QueryMemoryDescriptor::useStreamingTopN(), QueryMemoryDescriptor::varlenOutputBufferElemSize(), and VLOG.

Referenced by Executor::executePlanWithGroupBy(), and Executor::executePlanWithoutGroupBy().

221  {
222  auto timer = DEBUG_TIMER(__func__);
223  INJECT_TIMER(lauchGpuCode);
224 #ifdef HAVE_CUDA
227  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
228 
229  bool is_group_by{query_mem_desc_.isGroupBy()};
230 
231  RenderAllocator* render_allocator = nullptr;
232  if (render_allocator_map) {
233  render_allocator = render_allocator_map->getRenderAllocator(device_id);
234  }
235 
236  CHECK(cu_functions);
237  const auto native_code = cu_functions->getNativeCode(device_id);
238  auto cu_func = static_cast<CUfunction>(native_code.first);
239  std::vector<int64_t*> out_vec;
240  uint32_t num_fragments = col_buffers.size();
241  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
242 
243  CUevent start0, stop0; // preparation
244  cuEventCreate(&start0, 0);
245  cuEventCreate(&stop0, 0);
246  CUevent start1, stop1; // cuLaunchKernel
247  cuEventCreate(&start1, 0);
248  cuEventCreate(&stop1, 0);
249  CUevent start2, stop2; // finish
250  cuEventCreate(&start2, 0);
251  cuEventCreate(&stop2, 0);
252 
253  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
254  cuEventRecord(start0, 0);
255  }
256 
258  initializeDynamicWatchdog(native_code.second, device_id);
259  }
260 
261  if (allow_runtime_interrupt && !render_allocator) {
262  initializeRuntimeInterrupter(native_code.second, device_id);
263  }
264 
265  auto kernel_params = prepareKernelParams(col_buffers,
266  literal_buff,
267  num_rows,
268  frag_offsets,
269  scan_limit,
270  init_agg_vals,
271  error_codes,
272  num_tables,
273  join_hash_tables,
274  data_mgr,
275  device_id,
276  hoist_literals,
277  is_group_by);
278 
279  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
280  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
281 
282  const unsigned block_size_y = 1;
283  const unsigned block_size_z = 1;
284  const unsigned grid_size_y = 1;
285  const unsigned grid_size_z = 1;
286  const auto total_thread_count = block_size_x * grid_size_x;
287  const auto err_desc = kernel_params[ERROR_CODE];
288 
289  if (is_group_by) {
290  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
291  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
292  auto gpu_group_by_buffers =
293  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
295  kernel_params[INIT_AGG_VALS],
296  device_id,
298  block_size_x,
299  grid_size_x,
300  executor_->warpSize(),
301  can_sort_on_gpu,
303  render_allocator);
304  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
305  copy_to_gpu(data_mgr,
306  kernel_params[MAX_MATCHED],
307  &max_matched,
308  sizeof(max_matched),
309  device_id);
310 
311  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.ptrs;
312  std::vector<void*> param_ptrs;
313  for (auto& param : kernel_params) {
314  param_ptrs.push_back(&param);
315  }
316 
317  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
318  cuEventRecord(stop0, 0);
319  cuEventSynchronize(stop0);
320  float milliseconds0 = 0;
321  cuEventElapsedTime(&milliseconds0, start0, stop0);
322  VLOG(1) << "Device " << std::to_string(device_id)
323  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
324  << " ms";
325  cuEventRecord(start1, 0);
326  }
327 
328  if (hoist_literals) {
329  checkCudaErrors(cuLaunchKernel(cu_func,
330  grid_size_x,
331  grid_size_y,
332  grid_size_z,
333  block_size_x,
334  block_size_y,
335  block_size_z,
336  shared_memory_size,
337  nullptr,
338  &param_ptrs[0],
339  nullptr));
340  } else {
341  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
342  checkCudaErrors(cuLaunchKernel(cu_func,
343  grid_size_x,
344  grid_size_y,
345  grid_size_z,
346  block_size_x,
347  block_size_y,
348  block_size_z,
349  shared_memory_size,
350  nullptr,
351  &param_ptrs[0],
352  nullptr));
353  }
354  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
355  executor_->registerActiveModule(native_code.second, device_id);
356  cuEventRecord(stop1, 0);
357  cuEventSynchronize(stop1);
358  executor_->unregisterActiveModule(native_code.second, device_id);
359  float milliseconds1 = 0;
360  cuEventElapsedTime(&milliseconds1, start1, stop1);
361  VLOG(1) << "Device " << std::to_string(device_id)
362  << ": launchGpuCode: group-by cuLaunchKernel: "
363  << std::to_string(milliseconds1) << " ms";
364  cuEventRecord(start2, 0);
365  }
366 
367  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
368  reinterpret_cast<int8_t*>(err_desc),
369  error_codes.size() * sizeof(error_codes[0]));
370  *error_code = aggregate_error_codes(error_codes);
371  if (*error_code > 0) {
372  return {};
373  }
374 
375  if (!render_allocator) {
377  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
379  gpu_group_by_buffers,
380  ra_exe_unit,
381  total_thread_count,
382  device_id);
383  } else {
384  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
385  try {
388  gpu_group_by_buffers,
389  data_mgr,
390  device_id);
391  } catch (const std::bad_alloc&) {
392  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
393  }
394  }
398  query_buffers_->compactProjectionBuffersGpu(
400  data_mgr,
401  gpu_group_by_buffers,
403  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
404  device_id);
405  } else {
406  size_t num_allocated_rows{0};
407  if (ra_exe_unit.use_bump_allocator) {
408  num_allocated_rows = get_num_allocated_rows_from_gpu(
409  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
410  // First, check the error code. If we ran out of slots, don't copy data back
411  // into the ResultSet or update ResultSet entry count
412  if (*error_code < 0) {
413  return {};
414  }
415  }
416  query_buffers_->copyGroupByBuffersFromGpu(
417  data_mgr,
419  ra_exe_unit.use_bump_allocator ? num_allocated_rows
421  gpu_group_by_buffers,
422  &ra_exe_unit,
423  block_size_x,
424  grid_size_x,
425  device_id,
426  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
427  if (num_allocated_rows) {
428  CHECK(ra_exe_unit.use_bump_allocator);
429  CHECK(!query_buffers_->result_sets_.empty());
430  query_buffers_->result_sets_.front()->updateStorageEntryCount(
431  num_allocated_rows);
432  }
433  }
434  } else {
435  query_buffers_->copyGroupByBuffersFromGpu(
436  data_mgr,
439  gpu_group_by_buffers,
440  &ra_exe_unit,
441  block_size_x,
442  grid_size_x,
443  device_id,
444  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
445  }
446  }
447  }
448  } else {
449  std::vector<CUdeviceptr> out_vec_dev_buffers;
450  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
451  // by default, non-grouped aggregate queries generate one result per available thread
452  // in the lifetime of (potentially multi-fragment) kernel execution.
453  // We can reduce these intermediate results internally in the device and hence have
454  // only one result per device, if GPU shared memory optimizations are enabled.
455  const auto num_results_per_agg_col =
456  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
457  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
458  if (ra_exe_unit.estimator) {
459  estimator_result_set_.reset(new ResultSet(
460  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
461  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
462  estimator_result_set_->getDeviceEstimatorBuffer()));
463  } else {
464  for (size_t i = 0; i < agg_col_count; ++i) {
465  CUdeviceptr out_vec_dev_buffer =
466  num_fragments ? reinterpret_cast<CUdeviceptr>(
467  gpu_allocator_->alloc(output_buffer_size_per_agg))
468  : 0;
469  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
470  if (shared_memory_size) {
471  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
472  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
473  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
474  output_buffer_size_per_agg);
475  }
476  }
477  }
478  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
479  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
480  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
481  agg_col_count * sizeof(CUdeviceptr));
482  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
483  std::vector<void*> param_ptrs;
484  for (auto& param : kernel_params) {
485  param_ptrs.push_back(&param);
486  }
487 
488  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
489  cuEventRecord(stop0, 0);
490  cuEventSynchronize(stop0);
491  float milliseconds0 = 0;
492  cuEventElapsedTime(&milliseconds0, start0, stop0);
493  VLOG(1) << "Device " << std::to_string(device_id)
494  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
495  cuEventRecord(start1, 0);
496  }
497 
498  if (hoist_literals) {
499  checkCudaErrors(cuLaunchKernel(cu_func,
500  grid_size_x,
501  grid_size_y,
502  grid_size_z,
503  block_size_x,
504  block_size_y,
505  block_size_z,
506  shared_memory_size,
507  nullptr,
508  &param_ptrs[0],
509  nullptr));
510  } else {
511  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
512  checkCudaErrors(cuLaunchKernel(cu_func,
513  grid_size_x,
514  grid_size_y,
515  grid_size_z,
516  block_size_x,
517  block_size_y,
518  block_size_z,
519  shared_memory_size,
520  nullptr,
521  &param_ptrs[0],
522  nullptr));
523  }
524 
525  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
526  executor_->registerActiveModule(native_code.second, device_id);
527  cuEventRecord(stop1, 0);
528  cuEventSynchronize(stop1);
529  executor_->unregisterActiveModule(native_code.second, device_id);
530  float milliseconds1 = 0;
531  cuEventElapsedTime(&milliseconds1, start1, stop1);
532  VLOG(1) << "Device " << std::to_string(device_id)
533  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
534  << " ms";
535  cuEventRecord(start2, 0);
536  }
537 
538  copy_from_gpu(data_mgr,
539  &error_codes[0],
540  err_desc,
541  error_codes.size() * sizeof(error_codes[0]),
542  device_id);
543  *error_code = aggregate_error_codes(error_codes);
544  if (*error_code > 0) {
545  return {};
546  }
547  if (ra_exe_unit.estimator) {
549  estimator_result_set_->syncEstimatorBuffer();
550  return {};
551  }
552  for (size_t i = 0; i < agg_col_count; ++i) {
553  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
554  copy_from_gpu(data_mgr,
555  host_out_vec,
556  out_vec_dev_buffers[i],
557  output_buffer_size_per_agg,
558  device_id);
559  out_vec.push_back(host_out_vec);
560  }
561  }
562  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
563  if (count_distinct_bitmap_mem) {
564  copy_from_gpu(data_mgr,
565  query_buffers_->getCountDistinctHostPtr(),
566  count_distinct_bitmap_mem,
567  query_buffers_->getCountDistinctBitmapBytes(),
568  device_id);
569  }
570 
571  const auto varlen_output_gpu_buf = query_buffers_->getVarlenOutputPtr();
572  if (varlen_output_gpu_buf) {
574  const size_t varlen_output_buf_bytes =
577  CHECK(query_buffers_->getVarlenOutputHostPtr());
578  copy_from_gpu(data_mgr,
579  query_buffers_->getVarlenOutputHostPtr(),
580  varlen_output_gpu_buf,
581  varlen_output_buf_bytes,
582  device_id);
583  }
584 
585  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
586  cuEventRecord(stop2, 0);
587  cuEventSynchronize(stop2);
588  float milliseconds2 = 0;
589  cuEventElapsedTime(&milliseconds2, start2, stop2);
590  VLOG(1) << "Device " << std::to_string(device_id)
591  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
592  }
593 
594  return out_vec;
595 #else
596  return {};
597 #endif
598 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
RenderAllocator * getRenderAllocator(size_t device_id)
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
std::pair< void *, void * > getNativeCode(const size_t device_id) const
Definition: NvidiaKernel.h:81
std::string to_string(char const *&&v)
std::unique_ptr< QueryMemoryInitializer > query_buffers_
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
std::optional< size_t > varlenOutputBufferElemSize() const
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::unique_ptr< CudaAllocator > gpu_allocator_
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
std::unique_ptr< ResultSet > estimator_result_set_
#define VLOG(n)
Definition: Logger.h:303

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class Executor
friend

Definition at line 149 of file QueryExecutionContext.h.

Member Data Documentation

const ExecutorDeviceType QueryExecutionContext::device_type_
private

Definition at line 142 of file QueryExecutionContext.h.

Referenced by getRowSet(), and groupBufferToResults().

const ExecutorDispatchMode QueryExecutionContext::dispatch_mode_
private

Definition at line 143 of file QueryExecutionContext.h.

Referenced by launchGpuCode().

std::unique_ptr<ResultSet> QueryExecutionContext::estimator_result_set_
mutableprivate
const Executor* QueryExecutionContext::executor_
private
std::unique_ptr<CudaAllocator> QueryExecutionContext::gpu_allocator_
private

Definition at line 137 of file QueryExecutionContext.h.

Referenced by launchGpuCode(), and QueryExecutionContext().

const bool QueryExecutionContext::output_columnar_
private

Definition at line 145 of file QueryExecutionContext.h.

Referenced by groupBufferToDeinterleavedResults(), and launchGpuCode().

std::shared_ptr<RowSetMemoryOwner> QueryExecutionContext::row_set_mem_owner_
private

Definition at line 144 of file QueryExecutionContext.h.

Referenced by getRowSet(), and groupBufferToDeinterleavedResults().


The documentation for this class was generated from the following files: