OmniSciDB  c07336695a
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "Execute.h"
21 #include "GpuInitGroups.h"
22 #include "InPlaceSort.h"
23 #include "QueryMemoryInitializer.h"
24 #include "RelAlgExecutionUnit.h"
25 #include "ResultSet.h"
26 #include "SpeculativeTopN.h"
27 #include "StreamingTopN.h"
28 
30  const RelAlgExecutionUnit& ra_exe_unit,
31  const QueryMemoryDescriptor& query_mem_desc,
32  const Executor* executor,
33  const ExecutorDeviceType device_type,
34  const ExecutorDispatchMode dispatch_mode,
35  const int device_id,
36  const int64_t num_rows,
37  const std::vector<std::vector<const int8_t*>>& col_buffers,
38  const std::vector<std::vector<uint64_t>>& frag_offsets,
39  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
40  const bool output_columnar,
41  const bool sort_on_gpu,
42  RenderInfo* render_info)
43  : query_mem_desc_(query_mem_desc)
44  , executor_(executor)
45  , device_type_(device_type)
46  , dispatch_mode_(dispatch_mode)
47  , row_set_mem_owner_(row_set_mem_owner)
48  , output_columnar_(output_columnar) {
49  CHECK(executor);
50  auto& data_mgr = executor->catalog_->getDataMgr();
51  if (device_type == ExecutorDeviceType::GPU) {
52  gpu_allocator_ = std::make_unique<CudaAllocator>(&data_mgr, device_id);
53  }
54 
55  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
56  ? render_info->render_allocator_map_ptr.get()
57  : nullptr;
58  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
59  query_mem_desc,
60  device_id,
61  device_type,
62  dispatch_mode,
63  output_columnar,
65  num_rows,
66  col_buffers,
67  frag_offsets,
68  render_allocator_map,
69  render_info,
70  row_set_mem_owner,
71  gpu_allocator_.get(),
72  executor);
73 }
74 
76  const size_t i) const {
78  const auto& result_set = query_buffers_->getResultSet(i);
79  auto deinterleaved_query_mem_desc =
81  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
82  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
83 
84  auto deinterleaved_result_set =
85  std::make_shared<ResultSet>(result_set->getTargetInfos(),
86  std::vector<ColumnLazyFetchInfo>{},
87  std::vector<std::vector<const int8_t*>>{},
88  std::vector<std::vector<int64_t>>{},
89  std::vector<int64_t>{},
91  -1,
92  deinterleaved_query_mem_desc,
94  executor_);
95  auto deinterleaved_storage =
96  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
97  auto deinterleaved_buffer =
98  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
99  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
100  size_t deinterleaved_buffer_idx = 0;
102  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
103  bin_idx < result_set->entryCount();
104  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
105  std::vector<int64_t> agg_vals(agg_col_count, 0);
106  memcpy(&agg_vals[0],
107  &executor_->plan_state_->init_agg_vals_[0],
108  agg_col_count * sizeof(agg_vals[0]));
109  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
110  executor_->warpSize(),
111  false,
112  true,
113  agg_vals,
115  result_set->getTargetInfos(),
116  executor_->plan_state_->init_agg_vals_);
117  for (size_t agg_idx = 0; agg_idx < agg_col_count;
118  ++agg_idx, ++deinterleaved_buffer_idx) {
119  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
120  }
121  }
122  query_buffers_->resetResultSet(i);
123  return deinterleaved_result_set;
124 }
125 
126 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
128  return query_buffers_->getAggInitValForIndex(index);
129 }
130 
132  const RelAlgExecutionUnit& ra_exe_unit,
133  const QueryMemoryDescriptor& query_mem_desc) const {
134  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
136  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
138  CHECK_EQ(size_t(1), group_by_buffers_size);
139  return groupBufferToResults(0);
140  }
141  size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
142  for (size_t i = 0; i < group_by_buffers_size; i += step) {
143  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
144  }
146  return executor_->reduceMultiDeviceResults(
147  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
148 }
149 
153  }
154  return query_buffers_->getResultSetOwned(i);
155 }
156 
157 #ifdef HAVE_CUDA
158 namespace {
159 
160 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
161  // Check overflow / division by zero / interrupt first
162  for (const auto err : error_codes) {
163  if (err > 0) {
164  return err;
165  }
166  }
167  for (const auto err : error_codes) {
168  if (err) {
169  return err;
170  }
171  }
172  return 0;
173 }
174 
175 } // namespace
176 #endif
177 
179  const RelAlgExecutionUnit& ra_exe_unit,
180  const std::vector<std::pair<void*, void*>>& cu_functions,
181  const bool hoist_literals,
182  const std::vector<int8_t>& literal_buff,
183  std::vector<std::vector<const int8_t*>> col_buffers,
184  const std::vector<std::vector<int64_t>>& num_rows,
185  const std::vector<std::vector<uint64_t>>& frag_offsets,
186  const int32_t scan_limit,
187  Data_Namespace::DataMgr* data_mgr,
188  const unsigned block_size_x,
189  const unsigned grid_size_x,
190  const int device_id,
191  int32_t* error_code,
192  const uint32_t num_tables,
193  const std::vector<int64_t>& join_hash_tables,
194  RenderAllocatorMap* render_allocator_map) {
195  INJECT_TIMER(lauchGpuCode);
196 #ifdef HAVE_CUDA
199  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
200 
201  bool is_group_by{query_mem_desc_.isGroupBy()};
202 
203  RenderAllocator* render_allocator = nullptr;
204  if (render_allocator_map) {
205  render_allocator = render_allocator_map->getRenderAllocator(device_id);
206  }
207 
208  auto cu_func = static_cast<CUfunction>(cu_functions[device_id].first);
209  std::vector<int64_t*> out_vec;
210  uint32_t num_fragments = col_buffers.size();
211  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
212 
213  CUevent start0, stop0; // preparation
214  cuEventCreate(&start0, 0);
215  cuEventCreate(&stop0, 0);
216  CUevent start1, stop1; // cuLaunchKernel
217  cuEventCreate(&start1, 0);
218  cuEventCreate(&stop1, 0);
219  CUevent start2, stop2; // finish
220  cuEventCreate(&start2, 0);
221  cuEventCreate(&stop2, 0);
222 
224  cuEventRecord(start0, 0);
225  }
226 
228  initializeDynamicWatchdog(cu_functions[device_id].second, device_id);
229  }
230 
231  auto kernel_params = prepareKernelParams(col_buffers,
232  literal_buff,
233  num_rows,
234  frag_offsets,
235  scan_limit,
236  init_agg_vals,
237  error_codes,
238  num_tables,
239  join_hash_tables,
240  data_mgr,
241  device_id,
242  hoist_literals,
243  is_group_by);
244 
245  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
246  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
247 
248  const unsigned block_size_y = 1;
249  const unsigned block_size_z = 1;
250  const unsigned grid_size_y = 1;
251  const unsigned grid_size_z = 1;
252  const auto total_thread_count = block_size_x * grid_size_x;
253  const auto err_desc = kernel_params[ERROR_CODE];
254 
255  if (is_group_by) {
256  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
257  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
258  auto gpu_group_by_buffers =
259  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
261  kernel_params[INIT_AGG_VALS],
262  device_id,
264  block_size_x,
265  grid_size_x,
266  executor_->warpSize(),
267  can_sort_on_gpu,
269  render_allocator);
270  if (ra_exe_unit.use_bump_allocator) {
271  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
272  copy_to_gpu(data_mgr,
273  kernel_params[MAX_MATCHED],
274  &max_matched,
275  sizeof(max_matched),
276  device_id);
277  }
278 
279  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.first;
280  std::vector<void*> param_ptrs;
281  for (auto& param : kernel_params) {
282  param_ptrs.push_back(&param);
283  }
284 
286  cuEventRecord(stop0, 0);
287  cuEventSynchronize(stop0);
288  float milliseconds0 = 0;
289  cuEventElapsedTime(&milliseconds0, start0, stop0);
290  VLOG(1) << "Device " << std::to_string(device_id)
291  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
292  << " ms";
293  cuEventRecord(start1, 0);
294  }
295 
296  if (hoist_literals) {
298  cuLaunchKernel(cu_func,
299  grid_size_x,
300  grid_size_y,
301  grid_size_z,
302  block_size_x,
303  block_size_y,
304  block_size_z,
306  nullptr,
307  &param_ptrs[0],
308  nullptr));
309  } else {
310  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
312  cuLaunchKernel(cu_func,
313  grid_size_x,
314  grid_size_y,
315  grid_size_z,
316  block_size_x,
317  block_size_y,
318  block_size_z,
320  nullptr,
321  &param_ptrs[0],
322  nullptr));
323  }
325  executor_->registerActiveModule(cu_functions[device_id].second, device_id);
326  cuEventRecord(stop1, 0);
327  cuEventSynchronize(stop1);
328  executor_->unregisterActiveModule(cu_functions[device_id].second, device_id);
329  float milliseconds1 = 0;
330  cuEventElapsedTime(&milliseconds1, start1, stop1);
331  VLOG(1) << "Device " << std::to_string(device_id)
332  << ": launchGpuCode: group-by cuLaunchKernel: "
333  << std::to_string(milliseconds1) << " ms";
334  cuEventRecord(start2, 0);
335  }
336 
337  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
338  reinterpret_cast<int8_t*>(err_desc),
339  error_codes.size() * sizeof(error_codes[0]));
340  *error_code = aggregate_error_codes(error_codes);
341  if (*error_code > 0) {
342  return {};
343  }
344 
345  if (!render_allocator) {
347  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
349  gpu_group_by_buffers,
350  ra_exe_unit,
351  total_thread_count,
352  device_id);
353  } else {
354  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
357  gpu_group_by_buffers,
358  data_mgr,
359  device_id);
360  }
364  query_buffers_->compactProjectionBuffersGpu(
366  data_mgr,
367  gpu_group_by_buffers,
369  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
370  device_id);
371  } else {
372  size_t num_allocated_rows{0};
373  if (ra_exe_unit.use_bump_allocator) {
374  num_allocated_rows = get_num_allocated_rows_from_gpu(
375  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
376  // First, check the error code. If we ran out of slots, don't copy data back
377  // into the ResultSet or update ResultSet entry count
378  if (*error_code < 0) {
379  return {};
380  }
381  }
382  query_buffers_->copyGroupByBuffersFromGpu(
383  data_mgr,
385  ra_exe_unit.use_bump_allocator ? num_allocated_rows
387  gpu_group_by_buffers,
388  ra_exe_unit,
389  block_size_x,
390  grid_size_x,
391  device_id,
392  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
393  if (num_allocated_rows) {
394  CHECK(ra_exe_unit.use_bump_allocator);
395  CHECK(!query_buffers_->result_sets_.empty());
396  query_buffers_->result_sets_.front()->updateStorageEntryCount(
397  num_allocated_rows);
398  }
399  }
400  } else {
401  query_buffers_->copyGroupByBuffersFromGpu(
402  data_mgr,
405  gpu_group_by_buffers,
406  ra_exe_unit,
407  block_size_x,
408  grid_size_x,
409  device_id,
410  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
411  }
412  }
413  }
414  } else {
415  std::vector<CUdeviceptr> out_vec_dev_buffers;
416  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
417  if (ra_exe_unit.estimator) {
418  estimator_result_set_.reset(new ResultSet(
419  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
420  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
421  estimator_result_set_->getDeviceEstimatorBuffer()));
422  } else {
423  for (size_t i = 0; i < agg_col_count; ++i) {
424  CUdeviceptr out_vec_dev_buffer =
425  num_fragments
426  ? reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(
427  block_size_x * grid_size_x * sizeof(int64_t) * num_fragments))
428  : 0;
429  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
430  }
431  }
432  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
433  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
434  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
435  agg_col_count * sizeof(CUdeviceptr));
436  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
437  std::vector<void*> param_ptrs;
438  for (auto& param : kernel_params) {
439  param_ptrs.push_back(&param);
440  }
441 
443  cuEventRecord(stop0, 0);
444  cuEventSynchronize(stop0);
445  float milliseconds0 = 0;
446  cuEventElapsedTime(&milliseconds0, start0, stop0);
447  VLOG(1) << "Device " << std::to_string(device_id)
448  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
449  cuEventRecord(start1, 0);
450  }
451 
452  if (hoist_literals) {
453  checkCudaErrors(cuLaunchKernel(cu_func,
454  grid_size_x,
455  grid_size_y,
456  grid_size_z,
457  block_size_x,
458  block_size_y,
459  block_size_z,
460  0,
461  nullptr,
462  &param_ptrs[0],
463  nullptr));
464  } else {
465  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
466  checkCudaErrors(cuLaunchKernel(cu_func,
467  grid_size_x,
468  grid_size_y,
469  grid_size_z,
470  block_size_x,
471  block_size_y,
472  block_size_z,
473  0,
474  nullptr,
475  &param_ptrs[0],
476  nullptr));
477  }
478 
480  executor_->registerActiveModule(cu_functions[device_id].second, device_id);
481  cuEventRecord(stop1, 0);
482  cuEventSynchronize(stop1);
483  executor_->unregisterActiveModule(cu_functions[device_id].second, device_id);
484  float milliseconds1 = 0;
485  cuEventElapsedTime(&milliseconds1, start1, stop1);
486  VLOG(1) << "Device " << std::to_string(device_id)
487  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
488  << " ms";
489  cuEventRecord(start2, 0);
490  }
491 
492  copy_from_gpu(data_mgr,
493  &error_codes[0],
494  err_desc,
495  error_codes.size() * sizeof(error_codes[0]),
496  device_id);
497  *error_code = aggregate_error_codes(error_codes);
498  if (*error_code > 0) {
499  return {};
500  }
501  if (ra_exe_unit.estimator) {
503  estimator_result_set_->syncEstimatorBuffer();
504  return {};
505  }
506  for (size_t i = 0; i < agg_col_count; ++i) {
507  int64_t* host_out_vec =
508  new int64_t[block_size_x * grid_size_x * sizeof(int64_t) * num_fragments];
509  copy_from_gpu(data_mgr,
510  host_out_vec,
511  out_vec_dev_buffers[i],
512  block_size_x * grid_size_x * sizeof(int64_t) * num_fragments,
513  device_id);
514  out_vec.push_back(host_out_vec);
515  }
516  }
517  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
518  if (count_distinct_bitmap_mem) {
519  copy_from_gpu(data_mgr,
520  query_buffers_->getCountDistinctHostPtr(),
521  count_distinct_bitmap_mem,
522  query_buffers_->getCountDistinctBitmapBytes(),
523  device_id);
524  }
525 
527  cuEventRecord(stop2, 0);
528  cuEventSynchronize(stop2);
529  float milliseconds2 = 0;
530  cuEventElapsedTime(&milliseconds2, start2, stop2);
531  VLOG(1) << "Device " << std::to_string(device_id)
532  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
533  }
534 
535  return out_vec;
536 #else
537  return {};
538 #endif
539 }
540 
542  const RelAlgExecutionUnit& ra_exe_unit,
543  const std::vector<std::pair<void*, void*>>& fn_ptrs,
544  const bool hoist_literals,
545  const std::vector<int8_t>& literal_buff,
546  std::vector<std::vector<const int8_t*>> col_buffers,
547  const std::vector<std::vector<int64_t>>& num_rows,
548  const std::vector<std::vector<uint64_t>>& frag_offsets,
549  const int32_t scan_limit,
550  int32_t* error_code,
551  const uint32_t num_tables,
552  const std::vector<int64_t>& join_hash_tables) {
553  INJECT_TIMER(lauchCpuCode);
554 
556  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
557 
558  std::vector<const int8_t**> multifrag_col_buffers;
559  for (auto& col_buffer : col_buffers) {
560  multifrag_col_buffers.push_back(&col_buffer[0]);
561  }
562  const int8_t*** multifrag_cols_ptr{
563  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
564  const uint64_t num_fragments =
565  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
566  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
567 
568  const bool is_group_by{query_mem_desc_.isGroupBy()};
569  std::vector<int64_t*> out_vec;
570  if (ra_exe_unit.estimator) {
571  estimator_result_set_.reset(
572  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
573  out_vec.push_back(
574  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
575  } else {
576  if (!is_group_by) {
577  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
578  auto buff = new int64_t[num_out_frags];
579  out_vec.push_back(static_cast<int64_t*>(buff));
580  }
581  }
582  }
583 
584  CHECK_EQ(num_rows.size(), col_buffers.size());
585  std::vector<int64_t> flatened_num_rows;
586  for (auto& nums : num_rows) {
587  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
588  }
589  std::vector<uint64_t> flatened_frag_offsets;
590  for (auto& offsets : frag_offsets) {
591  flatened_frag_offsets.insert(
592  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
593  }
594  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
595  auto num_rows_ptr =
596  rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0];
597  int32_t total_matched_init{0};
598 
599  std::vector<int64_t> cmpt_val_buff;
600  if (is_group_by) {
601  cmpt_val_buff =
603  init_agg_vals,
605  }
606 
607  const int64_t* join_hash_tables_ptr =
608  join_hash_tables.size() == 1
609  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
610  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
611  if (hoist_literals) {
612  using agg_query = void (*)(const int8_t***, // col_buffers
613  const uint64_t*, // num_fragments
614  const int8_t*, // literals
615  const int64_t*, // num_rows
616  const uint64_t*, // frag_row_offsets
617  const int32_t*, // max_matched
618  int32_t*, // total_matched
619  const int64_t*, // init_agg_value
620  int64_t**, // out
621  int32_t*, // error_code
622  const uint32_t*, // num_tables
623  const int64_t*); // join_hash_tables_ptr
624  if (is_group_by) {
625  reinterpret_cast<agg_query>(fn_ptrs[0].first)(
626  multifrag_cols_ptr,
627  &num_fragments,
628  &literal_buff[0],
629  num_rows_ptr,
630  &flatened_frag_offsets[0],
631  &scan_limit,
632  &total_matched_init,
633  &cmpt_val_buff[0],
634  query_buffers_->getGroupByBuffersPtr(),
635  error_code,
636  &num_tables,
637  join_hash_tables_ptr);
638  } else {
639  reinterpret_cast<agg_query>(fn_ptrs[0].first)(multifrag_cols_ptr,
640  &num_fragments,
641  &literal_buff[0],
642  num_rows_ptr,
643  &flatened_frag_offsets[0],
644  &scan_limit,
645  &total_matched_init,
646  &init_agg_vals[0],
647  &out_vec[0],
648  error_code,
649  &num_tables,
650  join_hash_tables_ptr);
651  }
652  } else {
653  using agg_query = void (*)(const int8_t***, // col_buffers
654  const uint64_t*, // num_fragments
655  const int64_t*, // num_rows
656  const uint64_t*, // frag_row_offsets
657  const int32_t*, // max_matched
658  int32_t*, // total_matched
659  const int64_t*, // init_agg_value
660  int64_t**, // out
661  int32_t*, // error_code
662  const uint32_t*, // num_tables
663  const int64_t*); // join_hash_tables_ptr
664  if (is_group_by) {
665  reinterpret_cast<agg_query>(fn_ptrs[0].first)(
666  multifrag_cols_ptr,
667  &num_fragments,
668  num_rows_ptr,
669  &flatened_frag_offsets[0],
670  &scan_limit,
671  &total_matched_init,
672  &cmpt_val_buff[0],
673  query_buffers_->getGroupByBuffersPtr(),
674  error_code,
675  &num_tables,
676  join_hash_tables_ptr);
677  } else {
678  reinterpret_cast<agg_query>(fn_ptrs[0].first)(multifrag_cols_ptr,
679  &num_fragments,
680  num_rows_ptr,
681  &flatened_frag_offsets[0],
682  &scan_limit,
683  &total_matched_init,
684  &init_agg_vals[0],
685  &out_vec[0],
686  error_code,
687  &num_tables,
688  join_hash_tables_ptr);
689  }
690  }
691 
692  if (ra_exe_unit.estimator) {
693  return {};
694  }
695 
696  if (rowid_lookup_num_rows && *error_code < 0) {
697  *error_code = 0;
698  }
699 
701  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
702  }
703 
706  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
707  }
708 
709  return out_vec;
710 }
711 
712 #ifdef HAVE_CUDA
713 void QueryExecutionContext::initializeDynamicWatchdog(void* native_module,
714  const int device_id) const {
715  auto cu_module = static_cast<CUmodule>(native_module);
716  CHECK(cu_module);
717  CUevent start, stop;
718  cuEventCreate(&start, 0);
719  cuEventCreate(&stop, 0);
720  cuEventRecord(start, 0);
721 
722  CUdeviceptr dw_cycle_budget;
723  size_t dw_cycle_budget_size;
724  // Translate milliseconds to device cycles
725  uint64_t cycle_budget = executor_->deviceCycles(g_dynamic_watchdog_time_limit);
726  if (device_id == 0) {
727  LOG(INFO) << "Dynamic Watchdog budget: GPU: "
729  << std::to_string(cycle_budget) << " cycles";
730  }
731  checkCudaErrors(cuModuleGetGlobal(
732  &dw_cycle_budget, &dw_cycle_budget_size, cu_module, "dw_cycle_budget"));
733  CHECK_EQ(dw_cycle_budget_size, sizeof(uint64_t));
734  checkCudaErrors(cuMemcpyHtoD(
735  dw_cycle_budget, reinterpret_cast<void*>(&cycle_budget), sizeof(uint64_t)));
736 
737  CUdeviceptr dw_sm_cycle_start;
738  size_t dw_sm_cycle_start_size;
739  checkCudaErrors(cuModuleGetGlobal(
740  &dw_sm_cycle_start, &dw_sm_cycle_start_size, cu_module, "dw_sm_cycle_start"));
741  CHECK_EQ(dw_sm_cycle_start_size, 128 * sizeof(uint64_t));
742  checkCudaErrors(cuMemsetD32(dw_sm_cycle_start, 0, 128 * 2));
743 
744  if (!executor_->interrupted_) {
745  // Executor is not marked as interrupted, make sure dynamic watchdog doesn't block
746  // execution
747  CUdeviceptr dw_abort;
748  size_t dw_abort_size;
749  checkCudaErrors(cuModuleGetGlobal(&dw_abort, &dw_abort_size, cu_module, "dw_abort"));
750  CHECK_EQ(dw_abort_size, sizeof(uint32_t));
751  checkCudaErrors(cuMemsetD32(dw_abort, 0, 1));
752  }
753 
754  cuEventRecord(stop, 0);
755  cuEventSynchronize(stop);
756  float milliseconds = 0;
757  cuEventElapsedTime(&milliseconds, start, stop);
758  VLOG(1) << "Device " << std::to_string(device_id)
759  << ": launchGpuCode: dynamic watchdog init: " << std::to_string(milliseconds)
760  << " ms\n";
761 }
762 
763 std::vector<CUdeviceptr> QueryExecutionContext::prepareKernelParams(
764  const std::vector<std::vector<const int8_t*>>& col_buffers,
765  const std::vector<int8_t>& literal_buff,
766  const std::vector<std::vector<int64_t>>& num_rows,
767  const std::vector<std::vector<uint64_t>>& frag_offsets,
768  const int32_t scan_limit,
769  const std::vector<int64_t>& init_agg_vals,
770  const std::vector<int32_t>& error_codes,
771  const uint32_t num_tables,
772  const std::vector<int64_t>& join_hash_tables,
773  Data_Namespace::DataMgr* data_mgr,
774  const int device_id,
775  const bool hoist_literals,
776  const bool is_group_by) const {
778  std::vector<CUdeviceptr> params(KERN_PARAM_COUNT, 0);
779  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
780  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
781  if (col_count) {
782  std::vector<CUdeviceptr> multifrag_col_dev_buffers;
783  for (auto frag_col_buffers : col_buffers) {
784  std::vector<CUdeviceptr> col_dev_buffers;
785  for (auto col_buffer : frag_col_buffers) {
786  col_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(col_buffer));
787  }
788  auto col_buffers_dev_ptr = reinterpret_cast<CUdeviceptr>(
789  gpu_allocator_->alloc(col_count * sizeof(CUdeviceptr)));
790  copy_to_gpu(data_mgr,
791  col_buffers_dev_ptr,
792  &col_dev_buffers[0],
793  col_count * sizeof(CUdeviceptr),
794  device_id);
795  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
796  }
797  params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(
798  gpu_allocator_->alloc(num_fragments * sizeof(CUdeviceptr)));
799  copy_to_gpu(data_mgr,
800  params[COL_BUFFERS],
801  &multifrag_col_dev_buffers[0],
802  num_fragments * sizeof(CUdeviceptr),
803  device_id);
804  }
805  params[NUM_FRAGMENTS] =
806  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint64_t)));
807  copy_to_gpu(
808  data_mgr, params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t), device_id);
809  CUdeviceptr literals_and_addr_mapping = reinterpret_cast<CUdeviceptr>(
810  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t)));
811  CHECK_EQ(CUdeviceptr{0}, literals_and_addr_mapping % 8);
812  std::vector<int64_t> additional_literal_bytes;
813  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
814  if (count_distinct_bitmap_mem) {
815  // Store host and device addresses
816  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
817  CHECK(count_distinct_bitmap_host_mem);
818  additional_literal_bytes.push_back(
819  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
820  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
821  copy_to_gpu(data_mgr,
822  literals_and_addr_mapping,
823  &additional_literal_bytes[0],
824  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]),
825  device_id);
826  }
827  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
828  sizeof(additional_literal_bytes[0]);
829  if (!literal_buff.empty()) {
830  CHECK(hoist_literals);
831  copy_to_gpu(
832  data_mgr, params[LITERALS], &literal_buff[0], literal_buff.size(), device_id);
833  }
834  CHECK_EQ(num_rows.size(), col_buffers.size());
835  std::vector<int64_t> flatened_num_rows;
836  for (auto& nums : num_rows) {
837  CHECK_EQ(nums.size(), num_tables);
838  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
839  }
840  params[NUM_ROWS] = reinterpret_cast<CUdeviceptr>(
841  gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size()));
842  copy_to_gpu(data_mgr,
843  params[NUM_ROWS],
844  &flatened_num_rows[0],
845  sizeof(int64_t) * flatened_num_rows.size(),
846  device_id);
847 
848  CHECK_EQ(frag_offsets.size(), col_buffers.size());
849  std::vector<int64_t> flatened_frag_offsets;
850  for (auto& offsets : frag_offsets) {
851  CHECK_EQ(offsets.size(), num_tables);
852  flatened_frag_offsets.insert(
853  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
854  }
855  params[FRAG_ROW_OFFSETS] = reinterpret_cast<CUdeviceptr>(
856  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size()));
857  copy_to_gpu(data_mgr,
858  params[FRAG_ROW_OFFSETS],
859  &flatened_frag_offsets[0],
860  sizeof(int64_t) * flatened_frag_offsets.size(),
861  device_id);
862 
863  // Note that this will be overwritten if we are setting the entry count during group by
864  // buffer allocation and initialization
865  const int32_t max_matched{scan_limit};
866  params[MAX_MATCHED] =
867  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(max_matched)));
868  copy_to_gpu(
869  data_mgr, params[MAX_MATCHED], &max_matched, sizeof(max_matched), device_id);
870 
871  int32_t total_matched{0};
872  params[TOTAL_MATCHED] =
873  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(total_matched)));
874  copy_to_gpu(
875  data_mgr, params[TOTAL_MATCHED], &total_matched, sizeof(total_matched), device_id);
876 
877  if (is_group_by && !output_columnar_) {
878  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
879  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
880  params[INIT_AGG_VALS] =
881  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t)));
882  copy_to_gpu(data_mgr,
883  params[INIT_AGG_VALS],
884  &cmpt_val_buff[0],
885  cmpt_sz * sizeof(int64_t),
886  device_id);
887  } else {
888  params[INIT_AGG_VALS] = reinterpret_cast<CUdeviceptr>(
889  gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t)));
890  copy_to_gpu(data_mgr,
891  params[INIT_AGG_VALS],
892  &init_agg_vals[0],
893  init_agg_vals.size() * sizeof(int64_t),
894  device_id);
895  }
896 
897  params[ERROR_CODE] = reinterpret_cast<CUdeviceptr>(
898  gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0])));
899  copy_to_gpu(data_mgr,
900  params[ERROR_CODE],
901  &error_codes[0],
902  error_codes.size() * sizeof(error_codes[0]),
903  device_id);
904 
905  params[NUM_TABLES] =
906  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint32_t)));
907  copy_to_gpu(data_mgr, params[NUM_TABLES], &num_tables, sizeof(uint32_t), device_id);
908 
909  const auto hash_table_count = join_hash_tables.size();
910  switch (hash_table_count) {
911  case 0: {
912  params[JOIN_HASH_TABLES] = CUdeviceptr(0);
913  break;
914  }
915  case 1:
916  params[JOIN_HASH_TABLES] = static_cast<CUdeviceptr>(join_hash_tables[0]);
917  break;
918  default: {
919  params[JOIN_HASH_TABLES] = reinterpret_cast<CUdeviceptr>(
920  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t)));
921  copy_to_gpu(data_mgr,
922  params[JOIN_HASH_TABLES],
923  &join_hash_tables[0],
924  hash_table_count * sizeof(int64_t),
925  device_id);
926  break;
927  }
928  }
929 
930  return params;
931 }
932 #endif
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
RenderAllocator * getRenderAllocator(size_t device_id)
bool use_streaming_top_n(const RelAlgExecutionUnit &ra_exe_unit, const bool output_columnar)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
const int8_t const int64_t * num_rows
size_t sharedMemBytes(const ExecutorDeviceType) const
ExecutorDeviceType
Streaming Top N algorithm.
#define LOG(tag)
Definition: Logger.h:182
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:70
int64_t getAggInitValForIndex(const size_t index) const
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< std::pair< void *, void *>> &fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t *>> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
const ExecutorDeviceType device_type_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t int32_t * total_matched
std::string to_string(char const *&&v)
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t *>> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, RenderInfo *)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:55
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
const SortInfo sort_info
#define INJECT_TIMER(DESC)
Definition: measure.h:91
bool interleavedBins(const ExecutorDeviceType) const
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
const std::shared_ptr< Analyzer::Estimator > estimator
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
const int8_t const int64_t const uint64_t const int32_t * max_matched
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
size_t getColOffInBytesInNextBin(const size_t col_idx) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< CudaAllocator > gpu_allocator_
#define CHECK(condition)
Definition: Logger.h:187
Basic constructors and methods of the row set interface.
size_t getColOffInBytes(const size_t col_idx) const
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:71
QueryDescriptionType getQueryDescriptionType() const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
const QueryMemoryDescriptor query_mem_desc_
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:277
void * CUmodule
Definition: nocuda.h:23
void sort_on_gpu(int64_t *val_buff, int32_t *key_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)