OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "Execute.h"
21 #include "GpuInitGroups.h"
22 #include "InPlaceSort.h"
23 #include "QueryMemoryInitializer.h"
24 #include "RelAlgExecutionUnit.h"
25 #include "ResultSet.h"
26 #include "SpeculativeTopN.h"
27 #include "StreamingTopN.h"
28 
30  const RelAlgExecutionUnit& ra_exe_unit,
32  const Executor* executor,
33  const ExecutorDeviceType device_type,
34  const ExecutorDispatchMode dispatch_mode,
35  const int device_id,
36  const int64_t num_rows,
37  const std::vector<std::vector<const int8_t*>>& col_buffers,
38  const std::vector<std::vector<uint64_t>>& frag_offsets,
39  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
40  const bool output_columnar,
41  const bool sort_on_gpu,
42  RenderInfo* render_info)
43  : query_mem_desc_(query_mem_desc)
44  , executor_(executor)
45  , device_type_(device_type)
46  , dispatch_mode_(dispatch_mode)
47  , row_set_mem_owner_(row_set_mem_owner)
48  , output_columnar_(output_columnar) {
49  CHECK(executor);
50  auto& data_mgr = executor->catalog_->getDataMgr();
51  if (device_type == ExecutorDeviceType::GPU) {
52  gpu_allocator_ = std::make_unique<CudaAllocator>(&data_mgr, device_id);
53  }
54 
55  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
56  ? render_info->render_allocator_map_ptr.get()
57  : nullptr;
58  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
60  device_id,
61  device_type,
62  dispatch_mode,
63  output_columnar,
65  num_rows,
66  col_buffers,
67  frag_offsets,
68  render_allocator_map,
69  render_info,
70  row_set_mem_owner,
71  gpu_allocator_.get(),
72  executor);
73 }
74 
76  const size_t i) const {
78  const auto& result_set = query_buffers_->getResultSet(i);
79  auto deinterleaved_query_mem_desc =
81  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
82  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
83 
84  auto deinterleaved_result_set =
85  std::make_shared<ResultSet>(result_set->getTargetInfos(),
86  std::vector<ColumnLazyFetchInfo>{},
87  std::vector<std::vector<const int8_t*>>{},
88  std::vector<std::vector<int64_t>>{},
89  std::vector<int64_t>{},
91  -1,
92  deinterleaved_query_mem_desc,
94  executor_->getCatalog(),
95  executor_->blockSize(),
96  executor_->gridSize());
97  auto deinterleaved_storage =
98  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
99  auto deinterleaved_buffer =
100  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
101  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
102  size_t deinterleaved_buffer_idx = 0;
103  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
104  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
105  bin_idx < result_set->entryCount();
106  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
107  std::vector<int64_t> agg_vals(agg_col_count, 0);
108  memcpy(&agg_vals[0],
109  &executor_->plan_state_->init_agg_vals_[0],
110  agg_col_count * sizeof(agg_vals[0]));
111  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
112  executor_->warpSize(),
113  false,
114  true,
115  agg_vals,
117  result_set->getTargetInfos(),
118  executor_->plan_state_->init_agg_vals_);
119  for (size_t agg_idx = 0; agg_idx < agg_col_count;
120  ++agg_idx, ++deinterleaved_buffer_idx) {
121  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
122  }
123  }
124  query_buffers_->resetResultSet(i);
125  return deinterleaved_result_set;
126 }
127 
128 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
130  return query_buffers_->getAggInitValForIndex(index);
131 }
132 
134  const RelAlgExecutionUnit& ra_exe_unit,
135  const QueryMemoryDescriptor& query_mem_desc) const {
136  auto timer = DEBUG_TIMER(__func__);
137  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
139  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
141  CHECK_EQ(size_t(1), group_by_buffers_size);
142  return groupBufferToResults(0);
143  }
144  size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
145  for (size_t i = 0; i < group_by_buffers_size; i += step) {
146  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
147  }
149  return executor_->reduceMultiDeviceResults(
150  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
151 }
152 
156  }
157  return query_buffers_->getResultSetOwned(i);
158 }
159 
160 #ifdef HAVE_CUDA
161 namespace {
162 
163 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
164  // Check overflow / division by zero / interrupt first
165  for (const auto err : error_codes) {
166  if (err > 0) {
167  return err;
168  }
169  }
170  for (const auto err : error_codes) {
171  if (err) {
172  return err;
173  }
174  }
175  return 0;
176 }
177 
178 } // namespace
179 #endif
180 
182  const RelAlgExecutionUnit& ra_exe_unit,
183  const GpuCompilationContext* cu_functions,
184  const bool hoist_literals,
185  const std::vector<int8_t>& literal_buff,
186  std::vector<std::vector<const int8_t*>> col_buffers,
187  const std::vector<std::vector<int64_t>>& num_rows,
188  const std::vector<std::vector<uint64_t>>& frag_offsets,
189  const int32_t scan_limit,
190  Data_Namespace::DataMgr* data_mgr,
191  const unsigned block_size_x,
192  const unsigned grid_size_x,
193  const int device_id,
194  const size_t shared_memory_size,
195  int32_t* error_code,
196  const uint32_t num_tables,
197  const std::vector<int64_t>& join_hash_tables,
198  RenderAllocatorMap* render_allocator_map) {
199  auto timer = DEBUG_TIMER(__func__);
200  INJECT_TIMER(lauchGpuCode);
201 #ifdef HAVE_CUDA
204  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
205 
206  bool is_group_by{query_mem_desc_.isGroupBy()};
207 
208  RenderAllocator* render_allocator = nullptr;
209  if (render_allocator_map) {
210  render_allocator = render_allocator_map->getRenderAllocator(device_id);
211  }
212 
213  CHECK(cu_functions);
214  const auto native_code = cu_functions->getNativeCode(device_id);
215  auto cu_func = static_cast<CUfunction>(native_code.first);
216  std::vector<int64_t*> out_vec;
217  uint32_t num_fragments = col_buffers.size();
218  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
219 
220  CUevent start0, stop0; // preparation
221  cuEventCreate(&start0, 0);
222  cuEventCreate(&stop0, 0);
223  CUevent start1, stop1; // cuLaunchKernel
224  cuEventCreate(&start1, 0);
225  cuEventCreate(&stop1, 0);
226  CUevent start2, stop2; // finish
227  cuEventCreate(&start2, 0);
228  cuEventCreate(&stop2, 0);
229 
231  (g_enable_runtime_query_interrupt && !render_allocator)) {
232  cuEventRecord(start0, 0);
233  }
234 
236  initializeDynamicWatchdog(native_code.second, device_id);
237  }
238 
239  if (g_enable_runtime_query_interrupt && !render_allocator) {
240  initializeRuntimeInterrupter(native_code.second, device_id);
241  }
242 
243  auto kernel_params = prepareKernelParams(col_buffers,
244  literal_buff,
245  num_rows,
246  frag_offsets,
247  scan_limit,
248  init_agg_vals,
249  error_codes,
250  num_tables,
251  join_hash_tables,
252  data_mgr,
253  device_id,
254  hoist_literals,
255  is_group_by);
256 
257  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
258  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
259 
260  const unsigned block_size_y = 1;
261  const unsigned block_size_z = 1;
262  const unsigned grid_size_y = 1;
263  const unsigned grid_size_z = 1;
264  const auto total_thread_count = block_size_x * grid_size_x;
265  const auto err_desc = kernel_params[ERROR_CODE];
266 
267  if (is_group_by) {
268  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
269  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
270  auto gpu_group_by_buffers =
271  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
273  kernel_params[INIT_AGG_VALS],
274  device_id,
276  block_size_x,
277  grid_size_x,
278  executor_->warpSize(),
279  can_sort_on_gpu,
281  render_allocator);
282  if (ra_exe_unit.use_bump_allocator) {
283  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
284  copy_to_gpu(data_mgr,
285  kernel_params[MAX_MATCHED],
286  &max_matched,
287  sizeof(max_matched),
288  device_id);
289  }
290 
291  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.first;
292  std::vector<void*> param_ptrs;
293  for (auto& param : kernel_params) {
294  param_ptrs.push_back(&param);
295  }
296 
298  (g_enable_runtime_query_interrupt && !render_allocator)) {
299  cuEventRecord(stop0, 0);
300  cuEventSynchronize(stop0);
301  float milliseconds0 = 0;
302  cuEventElapsedTime(&milliseconds0, start0, stop0);
303  VLOG(1) << "Device " << std::to_string(device_id)
304  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
305  << " ms";
306  cuEventRecord(start1, 0);
307  }
308 
309  if (hoist_literals) {
310  checkCudaErrors(cuLaunchKernel(cu_func,
311  grid_size_x,
312  grid_size_y,
313  grid_size_z,
314  block_size_x,
315  block_size_y,
316  block_size_z,
317  shared_memory_size,
318  nullptr,
319  &param_ptrs[0],
320  nullptr));
321  } else {
322  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
323  checkCudaErrors(cuLaunchKernel(cu_func,
324  grid_size_x,
325  grid_size_y,
326  grid_size_z,
327  block_size_x,
328  block_size_y,
329  block_size_z,
330  shared_memory_size,
331  nullptr,
332  &param_ptrs[0],
333  nullptr));
334  }
336  (g_enable_runtime_query_interrupt && !render_allocator)) {
337  executor_->registerActiveModule(native_code.second, device_id);
338  cuEventRecord(stop1, 0);
339  cuEventSynchronize(stop1);
340  executor_->unregisterActiveModule(native_code.second, device_id);
341  float milliseconds1 = 0;
342  cuEventElapsedTime(&milliseconds1, start1, stop1);
343  VLOG(1) << "Device " << std::to_string(device_id)
344  << ": launchGpuCode: group-by cuLaunchKernel: "
345  << std::to_string(milliseconds1) << " ms";
346  cuEventRecord(start2, 0);
347  }
348 
349  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
350  reinterpret_cast<int8_t*>(err_desc),
351  error_codes.size() * sizeof(error_codes[0]));
352  *error_code = aggregate_error_codes(error_codes);
353  if (*error_code > 0) {
354  return {};
355  }
356 
357  if (!render_allocator) {
359  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
361  gpu_group_by_buffers,
362  ra_exe_unit,
363  total_thread_count,
364  device_id);
365  } else {
366  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
367  try {
370  gpu_group_by_buffers,
371  data_mgr,
372  device_id);
373  } catch (const std::bad_alloc&) {
374  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
375  }
376  }
380  query_buffers_->compactProjectionBuffersGpu(
382  data_mgr,
383  gpu_group_by_buffers,
385  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
386  device_id);
387  } else {
388  size_t num_allocated_rows{0};
389  if (ra_exe_unit.use_bump_allocator) {
390  num_allocated_rows = get_num_allocated_rows_from_gpu(
391  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
392  // First, check the error code. If we ran out of slots, don't copy data back
393  // into the ResultSet or update ResultSet entry count
394  if (*error_code < 0) {
395  return {};
396  }
397  }
398  query_buffers_->copyGroupByBuffersFromGpu(
399  data_mgr,
401  ra_exe_unit.use_bump_allocator ? num_allocated_rows
403  gpu_group_by_buffers,
404  &ra_exe_unit,
405  block_size_x,
406  grid_size_x,
407  device_id,
408  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
409  if (num_allocated_rows) {
410  CHECK(ra_exe_unit.use_bump_allocator);
411  CHECK(!query_buffers_->result_sets_.empty());
412  query_buffers_->result_sets_.front()->updateStorageEntryCount(
413  num_allocated_rows);
414  }
415  }
416  } else {
417  query_buffers_->copyGroupByBuffersFromGpu(
418  data_mgr,
421  gpu_group_by_buffers,
422  &ra_exe_unit,
423  block_size_x,
424  grid_size_x,
425  device_id,
426  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
427  }
428  }
429  }
430  } else {
431  std::vector<CUdeviceptr> out_vec_dev_buffers;
432  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
433  // by default, non-grouped aggregate queries generate one result per available thread
434  // in the lifetime of (potentially multi-fragment) kernel execution.
435  // We can reduce these intermediate results internally in the device and hence have
436  // only one result per device, if GPU shared memory optimizations are enabled.
437  const auto num_results_per_agg_col =
438  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
439  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
440  if (ra_exe_unit.estimator) {
441  estimator_result_set_.reset(new ResultSet(
442  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
443  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
444  estimator_result_set_->getDeviceEstimatorBuffer()));
445  } else {
446  for (size_t i = 0; i < agg_col_count; ++i) {
447  CUdeviceptr out_vec_dev_buffer =
448  num_fragments ? reinterpret_cast<CUdeviceptr>(
449  gpu_allocator_->alloc(output_buffer_size_per_agg))
450  : 0;
451  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
452  if (shared_memory_size) {
453  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
454  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
455  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
456  output_buffer_size_per_agg);
457  }
458  }
459  }
460  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
461  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
462  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
463  agg_col_count * sizeof(CUdeviceptr));
464  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
465  std::vector<void*> param_ptrs;
466  for (auto& param : kernel_params) {
467  param_ptrs.push_back(&param);
468  }
469 
471  (g_enable_runtime_query_interrupt && !render_allocator)) {
472  cuEventRecord(stop0, 0);
473  cuEventSynchronize(stop0);
474  float milliseconds0 = 0;
475  cuEventElapsedTime(&milliseconds0, start0, stop0);
476  VLOG(1) << "Device " << std::to_string(device_id)
477  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
478  cuEventRecord(start1, 0);
479  }
480 
481  if (hoist_literals) {
482  checkCudaErrors(cuLaunchKernel(cu_func,
483  grid_size_x,
484  grid_size_y,
485  grid_size_z,
486  block_size_x,
487  block_size_y,
488  block_size_z,
489  shared_memory_size,
490  nullptr,
491  &param_ptrs[0],
492  nullptr));
493  } else {
494  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
495  checkCudaErrors(cuLaunchKernel(cu_func,
496  grid_size_x,
497  grid_size_y,
498  grid_size_z,
499  block_size_x,
500  block_size_y,
501  block_size_z,
502  shared_memory_size,
503  nullptr,
504  &param_ptrs[0],
505  nullptr));
506  }
507 
509  (g_enable_runtime_query_interrupt && !render_allocator)) {
510  executor_->registerActiveModule(native_code.second, device_id);
511  cuEventRecord(stop1, 0);
512  cuEventSynchronize(stop1);
513  executor_->unregisterActiveModule(native_code.second, device_id);
514  float milliseconds1 = 0;
515  cuEventElapsedTime(&milliseconds1, start1, stop1);
516  VLOG(1) << "Device " << std::to_string(device_id)
517  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
518  << " ms";
519  cuEventRecord(start2, 0);
520  }
521 
522  copy_from_gpu(data_mgr,
523  &error_codes[0],
524  err_desc,
525  error_codes.size() * sizeof(error_codes[0]),
526  device_id);
527  *error_code = aggregate_error_codes(error_codes);
528  if (*error_code > 0) {
529  return {};
530  }
531  if (ra_exe_unit.estimator) {
533  estimator_result_set_->syncEstimatorBuffer();
534  return {};
535  }
536  for (size_t i = 0; i < agg_col_count; ++i) {
537  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
538  copy_from_gpu(data_mgr,
539  host_out_vec,
540  out_vec_dev_buffers[i],
541  output_buffer_size_per_agg,
542  device_id);
543  out_vec.push_back(host_out_vec);
544  }
545  }
546  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
547  if (count_distinct_bitmap_mem) {
548  copy_from_gpu(data_mgr,
549  query_buffers_->getCountDistinctHostPtr(),
550  count_distinct_bitmap_mem,
551  query_buffers_->getCountDistinctBitmapBytes(),
552  device_id);
553  }
554 
556  (g_enable_runtime_query_interrupt && !render_allocator)) {
557  cuEventRecord(stop2, 0);
558  cuEventSynchronize(stop2);
559  float milliseconds2 = 0;
560  cuEventElapsedTime(&milliseconds2, start2, stop2);
561  VLOG(1) << "Device " << std::to_string(device_id)
562  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
563  }
564 
565  return out_vec;
566 #else
567  return {};
568 #endif
569 }
570 
572  const RelAlgExecutionUnit& ra_exe_unit,
573  const CpuCompilationContext* native_code,
574  const bool hoist_literals,
575  const std::vector<int8_t>& literal_buff,
576  std::vector<std::vector<const int8_t*>> col_buffers,
577  const std::vector<std::vector<int64_t>>& num_rows,
578  const std::vector<std::vector<uint64_t>>& frag_offsets,
579  const int32_t scan_limit,
580  int32_t* error_code,
581  const uint32_t num_tables,
582  const std::vector<int64_t>& join_hash_tables) {
583  auto timer = DEBUG_TIMER(__func__);
584  INJECT_TIMER(lauchCpuCode);
585 
587  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
588 
589  std::vector<const int8_t**> multifrag_col_buffers;
590  for (auto& col_buffer : col_buffers) {
591  multifrag_col_buffers.push_back(&col_buffer[0]);
592  }
593  const int8_t*** multifrag_cols_ptr{
594  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
595  const uint64_t num_fragments =
596  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
597  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
598 
599  const bool is_group_by{query_mem_desc_.isGroupBy()};
600  std::vector<int64_t*> out_vec;
601  if (ra_exe_unit.estimator) {
602  estimator_result_set_.reset(
603  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
604  out_vec.push_back(
605  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
606  } else {
607  if (!is_group_by) {
608  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
609  auto buff = new int64_t[num_out_frags];
610  out_vec.push_back(static_cast<int64_t*>(buff));
611  }
612  }
613  }
614 
615  CHECK_EQ(num_rows.size(), col_buffers.size());
616  std::vector<int64_t> flatened_num_rows;
617  for (auto& nums : num_rows) {
618  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
619  }
620  std::vector<uint64_t> flatened_frag_offsets;
621  for (auto& offsets : frag_offsets) {
622  flatened_frag_offsets.insert(
623  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
624  }
625  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
626  auto num_rows_ptr =
627  rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0];
628  int32_t total_matched_init{0};
629 
630  std::vector<int64_t> cmpt_val_buff;
631  if (is_group_by) {
632  cmpt_val_buff =
634  init_agg_vals,
636  }
637 
638  CHECK(native_code);
639  const int64_t* join_hash_tables_ptr =
640  join_hash_tables.size() == 1
641  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
642  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
643  if (hoist_literals) {
644  using agg_query = void (*)(const int8_t***, // col_buffers
645  const uint64_t*, // num_fragments
646  const int8_t*, // literals
647  const int64_t*, // num_rows
648  const uint64_t*, // frag_row_offsets
649  const int32_t*, // max_matched
650  int32_t*, // total_matched
651  const int64_t*, // init_agg_value
652  int64_t**, // out
653  int32_t*, // error_code
654  const uint32_t*, // num_tables
655  const int64_t*); // join_hash_tables_ptr
656  if (is_group_by) {
657  reinterpret_cast<agg_query>(native_code->func())(
658  multifrag_cols_ptr,
659  &num_fragments,
660  &literal_buff[0],
661  num_rows_ptr,
662  &flatened_frag_offsets[0],
663  &scan_limit,
664  &total_matched_init,
665  &cmpt_val_buff[0],
666  query_buffers_->getGroupByBuffersPtr(),
667  error_code,
668  &num_tables,
669  join_hash_tables_ptr);
670  } else {
671  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
672  &num_fragments,
673  &literal_buff[0],
674  num_rows_ptr,
675  &flatened_frag_offsets[0],
676  &scan_limit,
677  &total_matched_init,
678  &init_agg_vals[0],
679  &out_vec[0],
680  error_code,
681  &num_tables,
682  join_hash_tables_ptr);
683  }
684  } else {
685  using agg_query = void (*)(const int8_t***, // col_buffers
686  const uint64_t*, // num_fragments
687  const int64_t*, // num_rows
688  const uint64_t*, // frag_row_offsets
689  const int32_t*, // max_matched
690  int32_t*, // total_matched
691  const int64_t*, // init_agg_value
692  int64_t**, // out
693  int32_t*, // error_code
694  const uint32_t*, // num_tables
695  const int64_t*); // join_hash_tables_ptr
696  if (is_group_by) {
697  reinterpret_cast<agg_query>(native_code->func())(
698  multifrag_cols_ptr,
699  &num_fragments,
700  num_rows_ptr,
701  &flatened_frag_offsets[0],
702  &scan_limit,
703  &total_matched_init,
704  &cmpt_val_buff[0],
705  query_buffers_->getGroupByBuffersPtr(),
706  error_code,
707  &num_tables,
708  join_hash_tables_ptr);
709  } else {
710  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
711  &num_fragments,
712  num_rows_ptr,
713  &flatened_frag_offsets[0],
714  &scan_limit,
715  &total_matched_init,
716  &init_agg_vals[0],
717  &out_vec[0],
718  error_code,
719  &num_tables,
720  join_hash_tables_ptr);
721  }
722  }
723 
724  if (ra_exe_unit.estimator) {
725  return {};
726  }
727 
728  if (rowid_lookup_num_rows && *error_code < 0) {
729  *error_code = 0;
730  }
731 
733  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
734  }
735 
738  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
739  }
740 
741  return out_vec;
742 }
743 
744 #ifdef HAVE_CUDA
745 void QueryExecutionContext::initializeDynamicWatchdog(void* native_module,
746  const int device_id) const {
747  auto cu_module = static_cast<CUmodule>(native_module);
748  CHECK(cu_module);
749  CUevent start, stop;
750  cuEventCreate(&start, 0);
751  cuEventCreate(&stop, 0);
752  cuEventRecord(start, 0);
753 
755  size_t dw_cycle_budget_size;
756  // Translate milliseconds to device cycles
757  uint64_t cycle_budget = executor_->deviceCycles(g_dynamic_watchdog_time_limit);
758  if (device_id == 0) {
759  LOG(INFO) << "Dynamic Watchdog budget: GPU: "
761  << std::to_string(cycle_budget) << " cycles";
762  }
763  checkCudaErrors(cuModuleGetGlobal(
764  &dw_cycle_budget, &dw_cycle_budget_size, cu_module, "dw_cycle_budget"));
765  CHECK_EQ(dw_cycle_budget_size, sizeof(uint64_t));
766  checkCudaErrors(cuMemcpyHtoD(
767  dw_cycle_budget, reinterpret_cast<void*>(&cycle_budget), sizeof(uint64_t)));
768 
770  size_t dw_sm_cycle_start_size;
771  checkCudaErrors(cuModuleGetGlobal(
772  &dw_sm_cycle_start, &dw_sm_cycle_start_size, cu_module, "dw_sm_cycle_start"));
773  CHECK_EQ(dw_sm_cycle_start_size, 128 * sizeof(uint64_t));
774  checkCudaErrors(cuMemsetD32(dw_sm_cycle_start, 0, 128 * 2));
775 
776  if (!executor_->interrupted_.load()) {
777  // Executor is not marked as interrupted, make sure dynamic watchdog doesn't block
778  // execution
780  size_t dw_abort_size;
781  checkCudaErrors(cuModuleGetGlobal(&dw_abort, &dw_abort_size, cu_module, "dw_abort"));
782  CHECK_EQ(dw_abort_size, sizeof(uint32_t));
783  checkCudaErrors(cuMemsetD32(dw_abort, 0, 1));
784  }
785 
786  cuEventRecord(stop, 0);
787  cuEventSynchronize(stop);
788  float milliseconds = 0;
789  cuEventElapsedTime(&milliseconds, start, stop);
790  VLOG(1) << "Device " << std::to_string(device_id)
791  << ": launchGpuCode: dynamic watchdog init: " << std::to_string(milliseconds)
792  << " ms\n";
793 }
794 
795 void QueryExecutionContext::initializeRuntimeInterrupter(void* native_module,
796  const int device_id) const {
797  if (!executor_->interrupted_.load()) {
798  // Executor is not marked as interrupted, make sure interrupt flag doesn't block
799  // execution
800  auto cu_module = static_cast<CUmodule>(native_module);
801  CHECK(cu_module);
802  CUevent start, stop;
803  cuEventCreate(&start, 0);
804  cuEventCreate(&stop, 0);
805  cuEventRecord(start, 0);
806 
808  size_t runtime_interrupt_flag_size;
809  checkCudaErrors(cuModuleGetGlobal(&runtime_interrupt_flag,
810  &runtime_interrupt_flag_size,
811  cu_module,
812  "runtime_interrupt_flag"));
813  CHECK_EQ(runtime_interrupt_flag_size, sizeof(uint32_t));
814  checkCudaErrors(cuMemsetD32(runtime_interrupt_flag, 0, 1));
815 
816  cuEventRecord(stop, 0);
817  cuEventSynchronize(stop);
818  float milliseconds = 0;
819  cuEventElapsedTime(&milliseconds, start, stop);
820  VLOG(1) << "Device " << std::to_string(device_id)
821  << ": launchGpuCode: runtime query interrupter init: "
822  << std::to_string(milliseconds) << " ms\n";
823  }
824 }
825 
826 std::vector<CUdeviceptr> QueryExecutionContext::prepareKernelParams(
827  const std::vector<std::vector<const int8_t*>>& col_buffers,
828  const std::vector<int8_t>& literal_buff,
829  const std::vector<std::vector<int64_t>>& num_rows,
830  const std::vector<std::vector<uint64_t>>& frag_offsets,
831  const int32_t scan_limit,
832  const std::vector<int64_t>& init_agg_vals,
833  const std::vector<int32_t>& error_codes,
834  const uint32_t num_tables,
835  const std::vector<int64_t>& join_hash_tables,
836  Data_Namespace::DataMgr* data_mgr,
837  const int device_id,
838  const bool hoist_literals,
839  const bool is_group_by) const {
841  std::vector<CUdeviceptr> params(KERN_PARAM_COUNT, 0);
842  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
843  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
844  if (col_count) {
845  std::vector<CUdeviceptr> multifrag_col_dev_buffers;
846  for (auto frag_col_buffers : col_buffers) {
847  std::vector<CUdeviceptr> col_dev_buffers;
848  for (auto col_buffer : frag_col_buffers) {
849  col_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(col_buffer));
850  }
851  auto col_buffers_dev_ptr = reinterpret_cast<CUdeviceptr>(
852  gpu_allocator_->alloc(col_count * sizeof(CUdeviceptr)));
853  copy_to_gpu(data_mgr,
854  col_buffers_dev_ptr,
855  &col_dev_buffers[0],
856  col_count * sizeof(CUdeviceptr),
857  device_id);
858  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
859  }
860  params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(
861  gpu_allocator_->alloc(num_fragments * sizeof(CUdeviceptr)));
862  copy_to_gpu(data_mgr,
863  params[COL_BUFFERS],
864  &multifrag_col_dev_buffers[0],
865  num_fragments * sizeof(CUdeviceptr),
866  device_id);
867  }
868  params[NUM_FRAGMENTS] =
869  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint64_t)));
870  copy_to_gpu(
871  data_mgr, params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t), device_id);
872  CUdeviceptr literals_and_addr_mapping = reinterpret_cast<CUdeviceptr>(
873  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t)));
874  CHECK_EQ(CUdeviceptr{0}, literals_and_addr_mapping % 8);
875  std::vector<int64_t> additional_literal_bytes;
876  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
877  if (count_distinct_bitmap_mem) {
878  // Store host and device addresses
879  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
880  CHECK(count_distinct_bitmap_host_mem);
881  additional_literal_bytes.push_back(
882  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
883  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
884  copy_to_gpu(data_mgr,
885  literals_and_addr_mapping,
886  &additional_literal_bytes[0],
887  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]),
888  device_id);
889  }
890  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
891  sizeof(additional_literal_bytes[0]);
892  if (!literal_buff.empty()) {
893  CHECK(hoist_literals);
894  copy_to_gpu(
895  data_mgr, params[LITERALS], &literal_buff[0], literal_buff.size(), device_id);
896  }
897  CHECK_EQ(num_rows.size(), col_buffers.size());
898  std::vector<int64_t> flatened_num_rows;
899  for (auto& nums : num_rows) {
900  CHECK_EQ(nums.size(), num_tables);
901  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
902  }
903  params[NUM_ROWS] = reinterpret_cast<CUdeviceptr>(
904  gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size()));
905  copy_to_gpu(data_mgr,
906  params[NUM_ROWS],
907  &flatened_num_rows[0],
908  sizeof(int64_t) * flatened_num_rows.size(),
909  device_id);
910 
911  CHECK_EQ(frag_offsets.size(), col_buffers.size());
912  std::vector<int64_t> flatened_frag_offsets;
913  for (auto& offsets : frag_offsets) {
914  CHECK_EQ(offsets.size(), num_tables);
915  flatened_frag_offsets.insert(
916  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
917  }
918  params[FRAG_ROW_OFFSETS] = reinterpret_cast<CUdeviceptr>(
919  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size()));
920  copy_to_gpu(data_mgr,
921  params[FRAG_ROW_OFFSETS],
922  &flatened_frag_offsets[0],
923  sizeof(int64_t) * flatened_frag_offsets.size(),
924  device_id);
925 
926  // Note that this will be overwritten if we are setting the entry count during group by
927  // buffer allocation and initialization
928  const int32_t max_matched{scan_limit};
929  params[MAX_MATCHED] =
930  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(max_matched)));
931  copy_to_gpu(
932  data_mgr, params[MAX_MATCHED], &max_matched, sizeof(max_matched), device_id);
933 
934  int32_t total_matched{0};
935  params[TOTAL_MATCHED] =
936  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(total_matched)));
937  copy_to_gpu(
938  data_mgr, params[TOTAL_MATCHED], &total_matched, sizeof(total_matched), device_id);
939 
940  if (is_group_by && !output_columnar_) {
941  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
942  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
943  params[INIT_AGG_VALS] =
944  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t)));
945  copy_to_gpu(data_mgr,
946  params[INIT_AGG_VALS],
947  &cmpt_val_buff[0],
948  cmpt_sz * sizeof(int64_t),
949  device_id);
950  } else {
951  params[INIT_AGG_VALS] = reinterpret_cast<CUdeviceptr>(
952  gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t)));
953  copy_to_gpu(data_mgr,
954  params[INIT_AGG_VALS],
955  &init_agg_vals[0],
956  init_agg_vals.size() * sizeof(int64_t),
957  device_id);
958  }
959 
960  params[ERROR_CODE] = reinterpret_cast<CUdeviceptr>(
961  gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0])));
962  copy_to_gpu(data_mgr,
963  params[ERROR_CODE],
964  &error_codes[0],
965  error_codes.size() * sizeof(error_codes[0]),
966  device_id);
967 
968  params[NUM_TABLES] =
969  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint32_t)));
970  copy_to_gpu(data_mgr, params[NUM_TABLES], &num_tables, sizeof(uint32_t), device_id);
971 
972  const auto hash_table_count = join_hash_tables.size();
973  switch (hash_table_count) {
974  case 0: {
975  params[JOIN_HASH_TABLES] = CUdeviceptr(0);
976  break;
977  }
978  case 1:
979  params[JOIN_HASH_TABLES] = static_cast<CUdeviceptr>(join_hash_tables[0]);
980  break;
981  default: {
982  params[JOIN_HASH_TABLES] = reinterpret_cast<CUdeviceptr>(
983  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t)));
984  copy_to_gpu(data_mgr,
985  params[JOIN_HASH_TABLES],
986  &join_hash_tables[0],
987  hash_table_count * sizeof(int64_t),
988  device_id);
989  break;
990  }
991  }
992 
993  return params;
994 }
995 #endif
#define CHECK_EQ(x, y)
Definition: Logger.h:205
RenderAllocator * getRenderAllocator(size_t device_id)
ExecutorDeviceType
__device__ int64_t dw_sm_cycle_start[128]
Definition: cuda_mapd_rt.cu:92
Streaming Top N algorithm.
#define LOG(tag)
Definition: Logger.h:188
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
__device__ int64_t dw_cycle_budget
Definition: cuda_mapd_rt.cu:94
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
std::pair< void *, void * > getNativeCode(const size_t device_id) const
Definition: NvidiaKernel.h:81
const ExecutorDeviceType device_type_
std::string to_string(char const *&&v)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const SortInfo sort_info
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
int64_t getAggInitValForIndex(const size_t index) const
__device__ int32_t runtime_interrupt_flag
Definition: cuda_mapd_rt.cu:96
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, RenderInfo *)
QueryMemoryDescriptor query_mem_desc_
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:483
std::unique_ptr< CudaAllocator > gpu_allocator_
bool interleavedBins(const ExecutorDeviceType) const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Basic constructors and methods of the row set interface.
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:80
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
__device__ int32_t dw_abort
Definition: cuda_mapd_rt.cu:95
bool g_enable_runtime_query_interrupt
Definition: Execute.cpp:111
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:291
void * CUmodule
Definition: nocuda.h:23