OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "Execute.h"
21 #include "GpuInitGroups.h"
22 #include "InPlaceSort.h"
23 #include "QueryMemoryInitializer.h"
24 #include "RelAlgExecutionUnit.h"
25 #include "ResultSet.h"
26 #include "SpeculativeTopN.h"
27 #include "StreamingTopN.h"
28 
30  const RelAlgExecutionUnit& ra_exe_unit,
32  const Executor* executor,
33  const ExecutorDeviceType device_type,
34  const ExecutorDispatchMode dispatch_mode,
35  const int device_id,
36  const int64_t num_rows,
37  const std::vector<std::vector<const int8_t*>>& col_buffers,
38  const std::vector<std::vector<uint64_t>>& frag_offsets,
39  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
40  const bool output_columnar,
41  const bool sort_on_gpu,
42  RenderInfo* render_info)
43  : query_mem_desc_(query_mem_desc)
44  , executor_(executor)
45  , device_type_(device_type)
46  , dispatch_mode_(dispatch_mode)
47  , row_set_mem_owner_(row_set_mem_owner)
48  , output_columnar_(output_columnar) {
49  CHECK(executor);
50  auto& data_mgr = executor->catalog_->getDataMgr();
51  if (device_type == ExecutorDeviceType::GPU) {
52  gpu_allocator_ = std::make_unique<CudaAllocator>(&data_mgr, device_id);
53  }
54 
55  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
56  ? render_info->render_allocator_map_ptr.get()
57  : nullptr;
58  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
60  device_id,
61  device_type,
62  dispatch_mode,
63  output_columnar,
65  num_rows,
66  col_buffers,
67  frag_offsets,
68  render_allocator_map,
69  render_info,
70  row_set_mem_owner,
71  gpu_allocator_.get(),
72  executor);
73 }
74 
76  const size_t i) const {
78  const auto& result_set = query_buffers_->getResultSet(i);
79  auto deinterleaved_query_mem_desc =
81  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
82  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
83 
84  auto deinterleaved_result_set =
85  std::make_shared<ResultSet>(result_set->getTargetInfos(),
86  std::vector<ColumnLazyFetchInfo>{},
87  std::vector<std::vector<const int8_t*>>{},
88  std::vector<std::vector<int64_t>>{},
89  std::vector<int64_t>{},
91  -1,
92  deinterleaved_query_mem_desc,
94  executor_);
95  auto deinterleaved_storage =
96  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
97  auto deinterleaved_buffer =
98  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
99  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
100  size_t deinterleaved_buffer_idx = 0;
102  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
103  bin_idx < result_set->entryCount();
104  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
105  std::vector<int64_t> agg_vals(agg_col_count, 0);
106  memcpy(&agg_vals[0],
107  &executor_->plan_state_->init_agg_vals_[0],
108  agg_col_count * sizeof(agg_vals[0]));
109  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
110  executor_->warpSize(),
111  false,
112  true,
113  agg_vals,
115  result_set->getTargetInfos(),
116  executor_->plan_state_->init_agg_vals_);
117  for (size_t agg_idx = 0; agg_idx < agg_col_count;
118  ++agg_idx, ++deinterleaved_buffer_idx) {
119  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
120  }
121  }
122  query_buffers_->resetResultSet(i);
123  return deinterleaved_result_set;
124 }
125 
126 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
128  return query_buffers_->getAggInitValForIndex(index);
129 }
130 
132  const RelAlgExecutionUnit& ra_exe_unit,
133  const QueryMemoryDescriptor& query_mem_desc) const {
134  auto timer = DEBUG_TIMER(__func__);
135  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
137  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
139  CHECK_EQ(size_t(1), group_by_buffers_size);
140  return groupBufferToResults(0);
141  }
142  size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
143  for (size_t i = 0; i < group_by_buffers_size; i += step) {
144  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
145  }
147  return executor_->reduceMultiDeviceResults(
148  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
149 }
150 
154  }
155  return query_buffers_->getResultSetOwned(i);
156 }
157 
158 #ifdef HAVE_CUDA
159 namespace {
160 
161 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
162  // Check overflow / division by zero / interrupt first
163  for (const auto err : error_codes) {
164  if (err > 0) {
165  return err;
166  }
167  }
168  for (const auto err : error_codes) {
169  if (err) {
170  return err;
171  }
172  }
173  return 0;
174 }
175 
176 } // namespace
177 #endif
178 
180  const RelAlgExecutionUnit& ra_exe_unit,
181  const GpuCompilationContext* cu_functions,
182  const bool hoist_literals,
183  const std::vector<int8_t>& literal_buff,
184  std::vector<std::vector<const int8_t*>> col_buffers,
185  const std::vector<std::vector<int64_t>>& num_rows,
186  const std::vector<std::vector<uint64_t>>& frag_offsets,
187  const int32_t scan_limit,
188  Data_Namespace::DataMgr* data_mgr,
189  const unsigned block_size_x,
190  const unsigned grid_size_x,
191  const int device_id,
192  const size_t shared_memory_size,
193  int32_t* error_code,
194  const uint32_t num_tables,
195  const std::vector<int64_t>& join_hash_tables,
196  RenderAllocatorMap* render_allocator_map) {
197  auto timer = DEBUG_TIMER(__func__);
198  INJECT_TIMER(lauchGpuCode);
199 #ifdef HAVE_CUDA
202  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
203 
204  bool is_group_by{query_mem_desc_.isGroupBy()};
205 
206  RenderAllocator* render_allocator = nullptr;
207  if (render_allocator_map) {
208  render_allocator = render_allocator_map->getRenderAllocator(device_id);
209  }
210 
211  CHECK(cu_functions);
212  const auto native_code = cu_functions->getNativeCode(device_id);
213  auto cu_func = static_cast<CUfunction>(native_code.first);
214  std::vector<int64_t*> out_vec;
215  uint32_t num_fragments = col_buffers.size();
216  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
217 
218  CUevent start0, stop0; // preparation
219  cuEventCreate(&start0, 0);
220  cuEventCreate(&stop0, 0);
221  CUevent start1, stop1; // cuLaunchKernel
222  cuEventCreate(&start1, 0);
223  cuEventCreate(&stop1, 0);
224  CUevent start2, stop2; // finish
225  cuEventCreate(&start2, 0);
226  cuEventCreate(&stop2, 0);
227 
229  cuEventRecord(start0, 0);
230  }
231 
233  initializeDynamicWatchdog(native_code.second, device_id);
234  }
235 
237  initializeRuntimeInterrupter(native_code.second, device_id);
238  }
239 
240  auto kernel_params = prepareKernelParams(col_buffers,
241  literal_buff,
242  num_rows,
243  frag_offsets,
244  scan_limit,
245  init_agg_vals,
246  error_codes,
247  num_tables,
248  join_hash_tables,
249  data_mgr,
250  device_id,
251  hoist_literals,
252  is_group_by);
253 
254  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
255  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
256 
257  const unsigned block_size_y = 1;
258  const unsigned block_size_z = 1;
259  const unsigned grid_size_y = 1;
260  const unsigned grid_size_z = 1;
261  const auto total_thread_count = block_size_x * grid_size_x;
262  const auto err_desc = kernel_params[ERROR_CODE];
263 
264  if (is_group_by) {
265  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
266  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
267  auto gpu_group_by_buffers =
268  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
270  kernel_params[INIT_AGG_VALS],
271  device_id,
273  block_size_x,
274  grid_size_x,
275  executor_->warpSize(),
276  can_sort_on_gpu,
278  render_allocator);
279  if (ra_exe_unit.use_bump_allocator) {
280  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
281  copy_to_gpu(data_mgr,
282  kernel_params[MAX_MATCHED],
283  &max_matched,
284  sizeof(max_matched),
285  device_id);
286  }
287 
288  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.first;
289  std::vector<void*> param_ptrs;
290  for (auto& param : kernel_params) {
291  param_ptrs.push_back(&param);
292  }
293 
295  cuEventRecord(stop0, 0);
296  cuEventSynchronize(stop0);
297  float milliseconds0 = 0;
298  cuEventElapsedTime(&milliseconds0, start0, stop0);
299  VLOG(1) << "Device " << std::to_string(device_id)
300  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
301  << " ms";
302  cuEventRecord(start1, 0);
303  }
304 
305  if (hoist_literals) {
306  checkCudaErrors(cuLaunchKernel(cu_func,
307  grid_size_x,
308  grid_size_y,
309  grid_size_z,
310  block_size_x,
311  block_size_y,
312  block_size_z,
313  shared_memory_size,
314  nullptr,
315  &param_ptrs[0],
316  nullptr));
317  } else {
318  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
319  checkCudaErrors(cuLaunchKernel(cu_func,
320  grid_size_x,
321  grid_size_y,
322  grid_size_z,
323  block_size_x,
324  block_size_y,
325  block_size_z,
326  shared_memory_size,
327  nullptr,
328  &param_ptrs[0],
329  nullptr));
330  }
332  executor_->registerActiveModule(native_code.second, device_id);
333  cuEventRecord(stop1, 0);
334  cuEventSynchronize(stop1);
335  executor_->unregisterActiveModule(native_code.second, device_id);
336  float milliseconds1 = 0;
337  cuEventElapsedTime(&milliseconds1, start1, stop1);
338  VLOG(1) << "Device " << std::to_string(device_id)
339  << ": launchGpuCode: group-by cuLaunchKernel: "
340  << std::to_string(milliseconds1) << " ms";
341  cuEventRecord(start2, 0);
342  }
343 
344  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
345  reinterpret_cast<int8_t*>(err_desc),
346  error_codes.size() * sizeof(error_codes[0]));
347  *error_code = aggregate_error_codes(error_codes);
348  if (*error_code > 0) {
349  return {};
350  }
351 
352  if (!render_allocator) {
354  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
356  gpu_group_by_buffers,
357  ra_exe_unit,
358  total_thread_count,
359  device_id);
360  } else {
361  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
362  try {
365  gpu_group_by_buffers,
366  data_mgr,
367  device_id);
368  } catch (const std::bad_alloc&) {
369  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
370  }
371  }
375  query_buffers_->compactProjectionBuffersGpu(
377  data_mgr,
378  gpu_group_by_buffers,
380  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
381  device_id);
382  } else {
383  size_t num_allocated_rows{0};
384  if (ra_exe_unit.use_bump_allocator) {
385  num_allocated_rows = get_num_allocated_rows_from_gpu(
386  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
387  // First, check the error code. If we ran out of slots, don't copy data back
388  // into the ResultSet or update ResultSet entry count
389  if (*error_code < 0) {
390  return {};
391  }
392  }
393  query_buffers_->copyGroupByBuffersFromGpu(
394  data_mgr,
396  ra_exe_unit.use_bump_allocator ? num_allocated_rows
398  gpu_group_by_buffers,
399  &ra_exe_unit,
400  block_size_x,
401  grid_size_x,
402  device_id,
403  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
404  if (num_allocated_rows) {
405  CHECK(ra_exe_unit.use_bump_allocator);
406  CHECK(!query_buffers_->result_sets_.empty());
407  query_buffers_->result_sets_.front()->updateStorageEntryCount(
408  num_allocated_rows);
409  }
410  }
411  } else {
412  query_buffers_->copyGroupByBuffersFromGpu(
413  data_mgr,
416  gpu_group_by_buffers,
417  &ra_exe_unit,
418  block_size_x,
419  grid_size_x,
420  device_id,
421  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
422  }
423  }
424  }
425  } else {
426  std::vector<CUdeviceptr> out_vec_dev_buffers;
427  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
428  // by default, non-grouped aggregate queries generate one result per available thread
429  // in the lifetime of (potentially multi-fragment) kernel execution.
430  // We can reduce these intermediate results internally in the device and hence have
431  // only one result per device, if GPU shared memory optimizations are enabled.
432  const auto num_results_per_agg_col =
433  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
434  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
435  if (ra_exe_unit.estimator) {
436  estimator_result_set_.reset(new ResultSet(
437  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
438  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
439  estimator_result_set_->getDeviceEstimatorBuffer()));
440  } else {
441  for (size_t i = 0; i < agg_col_count; ++i) {
442  CUdeviceptr out_vec_dev_buffer =
443  num_fragments ? reinterpret_cast<CUdeviceptr>(
444  gpu_allocator_->alloc(output_buffer_size_per_agg))
445  : 0;
446  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
447  if (shared_memory_size) {
448  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
449  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
450  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
451  output_buffer_size_per_agg);
452  }
453  }
454  }
455  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
456  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
457  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
458  agg_col_count * sizeof(CUdeviceptr));
459  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
460  std::vector<void*> param_ptrs;
461  for (auto& param : kernel_params) {
462  param_ptrs.push_back(&param);
463  }
464 
466  cuEventRecord(stop0, 0);
467  cuEventSynchronize(stop0);
468  float milliseconds0 = 0;
469  cuEventElapsedTime(&milliseconds0, start0, stop0);
470  VLOG(1) << "Device " << std::to_string(device_id)
471  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
472  cuEventRecord(start1, 0);
473  }
474 
475  if (hoist_literals) {
476  checkCudaErrors(cuLaunchKernel(cu_func,
477  grid_size_x,
478  grid_size_y,
479  grid_size_z,
480  block_size_x,
481  block_size_y,
482  block_size_z,
483  shared_memory_size,
484  nullptr,
485  &param_ptrs[0],
486  nullptr));
487  } else {
488  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
489  checkCudaErrors(cuLaunchKernel(cu_func,
490  grid_size_x,
491  grid_size_y,
492  grid_size_z,
493  block_size_x,
494  block_size_y,
495  block_size_z,
496  shared_memory_size,
497  nullptr,
498  &param_ptrs[0],
499  nullptr));
500  }
501 
503  executor_->registerActiveModule(native_code.second, device_id);
504  cuEventRecord(stop1, 0);
505  cuEventSynchronize(stop1);
506  executor_->unregisterActiveModule(native_code.second, device_id);
507  float milliseconds1 = 0;
508  cuEventElapsedTime(&milliseconds1, start1, stop1);
509  VLOG(1) << "Device " << std::to_string(device_id)
510  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
511  << " ms";
512  cuEventRecord(start2, 0);
513  }
514 
515  copy_from_gpu(data_mgr,
516  &error_codes[0],
517  err_desc,
518  error_codes.size() * sizeof(error_codes[0]),
519  device_id);
520  *error_code = aggregate_error_codes(error_codes);
521  if (*error_code > 0) {
522  return {};
523  }
524  if (ra_exe_unit.estimator) {
526  estimator_result_set_->syncEstimatorBuffer();
527  return {};
528  }
529  for (size_t i = 0; i < agg_col_count; ++i) {
530  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
531  copy_from_gpu(data_mgr,
532  host_out_vec,
533  out_vec_dev_buffers[i],
534  output_buffer_size_per_agg,
535  device_id);
536  out_vec.push_back(host_out_vec);
537  }
538  }
539  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
540  if (count_distinct_bitmap_mem) {
541  copy_from_gpu(data_mgr,
542  query_buffers_->getCountDistinctHostPtr(),
543  count_distinct_bitmap_mem,
544  query_buffers_->getCountDistinctBitmapBytes(),
545  device_id);
546  }
547 
549  cuEventRecord(stop2, 0);
550  cuEventSynchronize(stop2);
551  float milliseconds2 = 0;
552  cuEventElapsedTime(&milliseconds2, start2, stop2);
553  VLOG(1) << "Device " << std::to_string(device_id)
554  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
555  }
556 
557  return out_vec;
558 #else
559  return {};
560 #endif
561 }
562 
564  const RelAlgExecutionUnit& ra_exe_unit,
565  const CpuCompilationContext* native_code,
566  const bool hoist_literals,
567  const std::vector<int8_t>& literal_buff,
568  std::vector<std::vector<const int8_t*>> col_buffers,
569  const std::vector<std::vector<int64_t>>& num_rows,
570  const std::vector<std::vector<uint64_t>>& frag_offsets,
571  const int32_t scan_limit,
572  int32_t* error_code,
573  const uint32_t num_tables,
574  const std::vector<int64_t>& join_hash_tables) {
575  auto timer = DEBUG_TIMER(__func__);
576  INJECT_TIMER(lauchCpuCode);
577 
579  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
580 
581  std::vector<const int8_t**> multifrag_col_buffers;
582  for (auto& col_buffer : col_buffers) {
583  multifrag_col_buffers.push_back(&col_buffer[0]);
584  }
585  const int8_t*** multifrag_cols_ptr{
586  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
587  const uint64_t num_fragments =
588  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
589  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
590 
591  const bool is_group_by{query_mem_desc_.isGroupBy()};
592  std::vector<int64_t*> out_vec;
593  if (ra_exe_unit.estimator) {
594  estimator_result_set_.reset(
595  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
596  out_vec.push_back(
597  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
598  } else {
599  if (!is_group_by) {
600  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
601  auto buff = new int64_t[num_out_frags];
602  out_vec.push_back(static_cast<int64_t*>(buff));
603  }
604  }
605  }
606 
607  CHECK_EQ(num_rows.size(), col_buffers.size());
608  std::vector<int64_t> flatened_num_rows;
609  for (auto& nums : num_rows) {
610  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
611  }
612  std::vector<uint64_t> flatened_frag_offsets;
613  for (auto& offsets : frag_offsets) {
614  flatened_frag_offsets.insert(
615  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
616  }
617  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
618  auto num_rows_ptr =
619  rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0];
620  int32_t total_matched_init{0};
621 
622  std::vector<int64_t> cmpt_val_buff;
623  if (is_group_by) {
624  cmpt_val_buff =
626  init_agg_vals,
628  }
629 
630  CHECK(native_code);
631  const int64_t* join_hash_tables_ptr =
632  join_hash_tables.size() == 1
633  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
634  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
635  if (hoist_literals) {
636  using agg_query = void (*)(const int8_t***, // col_buffers
637  const uint64_t*, // num_fragments
638  const int8_t*, // literals
639  const int64_t*, // num_rows
640  const uint64_t*, // frag_row_offsets
641  const int32_t*, // max_matched
642  int32_t*, // total_matched
643  const int64_t*, // init_agg_value
644  int64_t**, // out
645  int32_t*, // error_code
646  const uint32_t*, // num_tables
647  const int64_t*); // join_hash_tables_ptr
648  if (is_group_by) {
649  reinterpret_cast<agg_query>(native_code->func())(
650  multifrag_cols_ptr,
651  &num_fragments,
652  &literal_buff[0],
653  num_rows_ptr,
654  &flatened_frag_offsets[0],
655  &scan_limit,
656  &total_matched_init,
657  &cmpt_val_buff[0],
658  query_buffers_->getGroupByBuffersPtr(),
659  error_code,
660  &num_tables,
661  join_hash_tables_ptr);
662  } else {
663  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
664  &num_fragments,
665  &literal_buff[0],
666  num_rows_ptr,
667  &flatened_frag_offsets[0],
668  &scan_limit,
669  &total_matched_init,
670  &init_agg_vals[0],
671  &out_vec[0],
672  error_code,
673  &num_tables,
674  join_hash_tables_ptr);
675  }
676  } else {
677  using agg_query = void (*)(const int8_t***, // col_buffers
678  const uint64_t*, // num_fragments
679  const int64_t*, // num_rows
680  const uint64_t*, // frag_row_offsets
681  const int32_t*, // max_matched
682  int32_t*, // total_matched
683  const int64_t*, // init_agg_value
684  int64_t**, // out
685  int32_t*, // error_code
686  const uint32_t*, // num_tables
687  const int64_t*); // join_hash_tables_ptr
688  if (is_group_by) {
689  reinterpret_cast<agg_query>(native_code->func())(
690  multifrag_cols_ptr,
691  &num_fragments,
692  num_rows_ptr,
693  &flatened_frag_offsets[0],
694  &scan_limit,
695  &total_matched_init,
696  &cmpt_val_buff[0],
697  query_buffers_->getGroupByBuffersPtr(),
698  error_code,
699  &num_tables,
700  join_hash_tables_ptr);
701  } else {
702  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
703  &num_fragments,
704  num_rows_ptr,
705  &flatened_frag_offsets[0],
706  &scan_limit,
707  &total_matched_init,
708  &init_agg_vals[0],
709  &out_vec[0],
710  error_code,
711  &num_tables,
712  join_hash_tables_ptr);
713  }
714  }
715 
716  if (ra_exe_unit.estimator) {
717  return {};
718  }
719 
720  if (rowid_lookup_num_rows && *error_code < 0) {
721  *error_code = 0;
722  }
723 
725  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
726  }
727 
730  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
731  }
732 
733  return out_vec;
734 }
735 
736 #ifdef HAVE_CUDA
737 void QueryExecutionContext::initializeDynamicWatchdog(void* native_module,
738  const int device_id) const {
739  auto cu_module = static_cast<CUmodule>(native_module);
740  CHECK(cu_module);
741  CUevent start, stop;
742  cuEventCreate(&start, 0);
743  cuEventCreate(&stop, 0);
744  cuEventRecord(start, 0);
745 
747  size_t dw_cycle_budget_size;
748  // Translate milliseconds to device cycles
749  uint64_t cycle_budget = executor_->deviceCycles(g_dynamic_watchdog_time_limit);
750  if (device_id == 0) {
751  LOG(INFO) << "Dynamic Watchdog budget: GPU: "
753  << std::to_string(cycle_budget) << " cycles";
754  }
755  checkCudaErrors(cuModuleGetGlobal(
756  &dw_cycle_budget, &dw_cycle_budget_size, cu_module, "dw_cycle_budget"));
757  CHECK_EQ(dw_cycle_budget_size, sizeof(uint64_t));
758  checkCudaErrors(cuMemcpyHtoD(
759  dw_cycle_budget, reinterpret_cast<void*>(&cycle_budget), sizeof(uint64_t)));
760 
762  size_t dw_sm_cycle_start_size;
763  checkCudaErrors(cuModuleGetGlobal(
764  &dw_sm_cycle_start, &dw_sm_cycle_start_size, cu_module, "dw_sm_cycle_start"));
765  CHECK_EQ(dw_sm_cycle_start_size, 128 * sizeof(uint64_t));
766  checkCudaErrors(cuMemsetD32(dw_sm_cycle_start, 0, 128 * 2));
767 
768  if (!executor_->interrupted_.load()) {
769  // Executor is not marked as interrupted, make sure dynamic watchdog doesn't block
770  // execution
772  size_t dw_abort_size;
773  checkCudaErrors(cuModuleGetGlobal(&dw_abort, &dw_abort_size, cu_module, "dw_abort"));
774  CHECK_EQ(dw_abort_size, sizeof(uint32_t));
775  checkCudaErrors(cuMemsetD32(dw_abort, 0, 1));
776  }
777 
778  cuEventRecord(stop, 0);
779  cuEventSynchronize(stop);
780  float milliseconds = 0;
781  cuEventElapsedTime(&milliseconds, start, stop);
782  VLOG(1) << "Device " << std::to_string(device_id)
783  << ": launchGpuCode: dynamic watchdog init: " << std::to_string(milliseconds)
784  << " ms\n";
785 }
786 
787 void QueryExecutionContext::initializeRuntimeInterrupter(void* native_module,
788  const int device_id) const {
789  if (!executor_->interrupted_.load()) {
790  // Executor is not marked as interrupted, make sure interrupt flag doesn't block
791  // execution
792  auto cu_module = static_cast<CUmodule>(native_module);
793  CHECK(cu_module);
794  CUevent start, stop;
795  cuEventCreate(&start, 0);
796  cuEventCreate(&stop, 0);
797  cuEventRecord(start, 0);
798 
800  size_t runtime_interrupt_flag_size;
801  checkCudaErrors(cuModuleGetGlobal(&runtime_interrupt_flag,
802  &runtime_interrupt_flag_size,
803  cu_module,
804  "runtime_interrupt_flag"));
805  CHECK_EQ(runtime_interrupt_flag_size, sizeof(uint32_t));
806  checkCudaErrors(cuMemsetD32(runtime_interrupt_flag, 0, 1));
807 
808  cuEventRecord(stop, 0);
809  cuEventSynchronize(stop);
810  float milliseconds = 0;
811  cuEventElapsedTime(&milliseconds, start, stop);
812  VLOG(1) << "Device " << std::to_string(device_id)
813  << ": launchGpuCode: runtime query interrupter init: "
814  << std::to_string(milliseconds) << " ms\n";
815  }
816 }
817 
818 std::vector<CUdeviceptr> QueryExecutionContext::prepareKernelParams(
819  const std::vector<std::vector<const int8_t*>>& col_buffers,
820  const std::vector<int8_t>& literal_buff,
821  const std::vector<std::vector<int64_t>>& num_rows,
822  const std::vector<std::vector<uint64_t>>& frag_offsets,
823  const int32_t scan_limit,
824  const std::vector<int64_t>& init_agg_vals,
825  const std::vector<int32_t>& error_codes,
826  const uint32_t num_tables,
827  const std::vector<int64_t>& join_hash_tables,
828  Data_Namespace::DataMgr* data_mgr,
829  const int device_id,
830  const bool hoist_literals,
831  const bool is_group_by) const {
833  std::vector<CUdeviceptr> params(KERN_PARAM_COUNT, 0);
834  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
835  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
836  if (col_count) {
837  std::vector<CUdeviceptr> multifrag_col_dev_buffers;
838  for (auto frag_col_buffers : col_buffers) {
839  std::vector<CUdeviceptr> col_dev_buffers;
840  for (auto col_buffer : frag_col_buffers) {
841  col_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(col_buffer));
842  }
843  auto col_buffers_dev_ptr = reinterpret_cast<CUdeviceptr>(
844  gpu_allocator_->alloc(col_count * sizeof(CUdeviceptr)));
845  copy_to_gpu(data_mgr,
846  col_buffers_dev_ptr,
847  &col_dev_buffers[0],
848  col_count * sizeof(CUdeviceptr),
849  device_id);
850  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
851  }
852  params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(
853  gpu_allocator_->alloc(num_fragments * sizeof(CUdeviceptr)));
854  copy_to_gpu(data_mgr,
855  params[COL_BUFFERS],
856  &multifrag_col_dev_buffers[0],
857  num_fragments * sizeof(CUdeviceptr),
858  device_id);
859  }
860  params[NUM_FRAGMENTS] =
861  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint64_t)));
862  copy_to_gpu(
863  data_mgr, params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t), device_id);
864  CUdeviceptr literals_and_addr_mapping = reinterpret_cast<CUdeviceptr>(
865  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t)));
866  CHECK_EQ(CUdeviceptr{0}, literals_and_addr_mapping % 8);
867  std::vector<int64_t> additional_literal_bytes;
868  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
869  if (count_distinct_bitmap_mem) {
870  // Store host and device addresses
871  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
872  CHECK(count_distinct_bitmap_host_mem);
873  additional_literal_bytes.push_back(
874  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
875  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
876  copy_to_gpu(data_mgr,
877  literals_and_addr_mapping,
878  &additional_literal_bytes[0],
879  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]),
880  device_id);
881  }
882  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
883  sizeof(additional_literal_bytes[0]);
884  if (!literal_buff.empty()) {
885  CHECK(hoist_literals);
886  copy_to_gpu(
887  data_mgr, params[LITERALS], &literal_buff[0], literal_buff.size(), device_id);
888  }
889  CHECK_EQ(num_rows.size(), col_buffers.size());
890  std::vector<int64_t> flatened_num_rows;
891  for (auto& nums : num_rows) {
892  CHECK_EQ(nums.size(), num_tables);
893  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
894  }
895  params[NUM_ROWS] = reinterpret_cast<CUdeviceptr>(
896  gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size()));
897  copy_to_gpu(data_mgr,
898  params[NUM_ROWS],
899  &flatened_num_rows[0],
900  sizeof(int64_t) * flatened_num_rows.size(),
901  device_id);
902 
903  CHECK_EQ(frag_offsets.size(), col_buffers.size());
904  std::vector<int64_t> flatened_frag_offsets;
905  for (auto& offsets : frag_offsets) {
906  CHECK_EQ(offsets.size(), num_tables);
907  flatened_frag_offsets.insert(
908  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
909  }
910  params[FRAG_ROW_OFFSETS] = reinterpret_cast<CUdeviceptr>(
911  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size()));
912  copy_to_gpu(data_mgr,
913  params[FRAG_ROW_OFFSETS],
914  &flatened_frag_offsets[0],
915  sizeof(int64_t) * flatened_frag_offsets.size(),
916  device_id);
917 
918  // Note that this will be overwritten if we are setting the entry count during group by
919  // buffer allocation and initialization
920  const int32_t max_matched{scan_limit};
921  params[MAX_MATCHED] =
922  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(max_matched)));
923  copy_to_gpu(
924  data_mgr, params[MAX_MATCHED], &max_matched, sizeof(max_matched), device_id);
925 
926  int32_t total_matched{0};
927  params[TOTAL_MATCHED] =
928  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(total_matched)));
929  copy_to_gpu(
930  data_mgr, params[TOTAL_MATCHED], &total_matched, sizeof(total_matched), device_id);
931 
932  if (is_group_by && !output_columnar_) {
933  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
934  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
935  params[INIT_AGG_VALS] =
936  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t)));
937  copy_to_gpu(data_mgr,
938  params[INIT_AGG_VALS],
939  &cmpt_val_buff[0],
940  cmpt_sz * sizeof(int64_t),
941  device_id);
942  } else {
943  params[INIT_AGG_VALS] = reinterpret_cast<CUdeviceptr>(
944  gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t)));
945  copy_to_gpu(data_mgr,
946  params[INIT_AGG_VALS],
947  &init_agg_vals[0],
948  init_agg_vals.size() * sizeof(int64_t),
949  device_id);
950  }
951 
952  params[ERROR_CODE] = reinterpret_cast<CUdeviceptr>(
953  gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0])));
954  copy_to_gpu(data_mgr,
955  params[ERROR_CODE],
956  &error_codes[0],
957  error_codes.size() * sizeof(error_codes[0]),
958  device_id);
959 
960  params[NUM_TABLES] =
961  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint32_t)));
962  copy_to_gpu(data_mgr, params[NUM_TABLES], &num_tables, sizeof(uint32_t), device_id);
963 
964  const auto hash_table_count = join_hash_tables.size();
965  switch (hash_table_count) {
966  case 0: {
967  params[JOIN_HASH_TABLES] = CUdeviceptr(0);
968  break;
969  }
970  case 1:
971  params[JOIN_HASH_TABLES] = static_cast<CUdeviceptr>(join_hash_tables[0]);
972  break;
973  default: {
974  params[JOIN_HASH_TABLES] = reinterpret_cast<CUdeviceptr>(
975  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t)));
976  copy_to_gpu(data_mgr,
977  params[JOIN_HASH_TABLES],
978  &join_hash_tables[0],
979  hash_table_count * sizeof(int64_t),
980  device_id);
981  break;
982  }
983  }
984 
985  return params;
986 }
987 #endif
#define CHECK_EQ(x, y)
Definition: Logger.h:205
RenderAllocator * getRenderAllocator(size_t device_id)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t * join_hash_tables
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
const int8_t const int64_t * num_rows
ExecutorDeviceType
__device__ int64_t dw_sm_cycle_start[128]
Definition: cuda_mapd_rt.cu:92
Streaming Top N algorithm.
#define LOG(tag)
Definition: Logger.h:188
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
__device__ int64_t dw_cycle_budget
Definition: cuda_mapd_rt.cu:94
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
std::pair< void *, void * > getNativeCode(const size_t device_id) const
Definition: NvidiaKernel.h:81
const ExecutorDeviceType device_type_
std::string to_string(char const *&&v)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const SortInfo sort_info
#define INJECT_TIMER(DESC)
Definition: measure.h:91
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
int64_t getAggInitValForIndex(const size_t index) const
__device__ int32_t runtime_interrupt_flag
Definition: cuda_mapd_rt.cu:96
const std::shared_ptr< Analyzer::Estimator > estimator
const int8_t const int64_t const uint64_t const int32_t * max_matched
QueryDescriptionType getQueryDescriptionType() const
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, RenderInfo *)
QueryMemoryDescriptor query_mem_desc_
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:492
std::unique_ptr< CudaAllocator > gpu_allocator_
bool interleavedBins(const ExecutorDeviceType) const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Basic constructors and methods of the row set interface.
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:77
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
__device__ int32_t dw_abort
Definition: cuda_mapd_rt.cu:95
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:108
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:291
void * CUmodule
Definition: nocuda.h:23