OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "Execute.h"
21 #include "GpuInitGroups.h"
22 #include "InPlaceSort.h"
23 #include "QueryMemoryInitializer.h"
24 #include "RelAlgExecutionUnit.h"
25 #include "ResultSet.h"
26 #include "Shared/likely.h"
27 #include "SpeculativeTopN.h"
28 #include "StreamingTopN.h"
29 
31  const RelAlgExecutionUnit& ra_exe_unit,
33  const Executor* executor,
34  const ExecutorDeviceType device_type,
35  const ExecutorDispatchMode dispatch_mode,
36  const int device_id,
37  const int64_t num_rows,
38  const std::vector<std::vector<const int8_t*>>& col_buffers,
39  const std::vector<std::vector<uint64_t>>& frag_offsets,
40  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
41  const bool output_columnar,
42  const bool sort_on_gpu,
43  const size_t thread_idx,
44  RenderInfo* render_info)
45  : query_mem_desc_(query_mem_desc)
46  , executor_(executor)
47  , device_type_(device_type)
48  , dispatch_mode_(dispatch_mode)
49  , row_set_mem_owner_(row_set_mem_owner)
50  , output_columnar_(output_columnar) {
51  CHECK(executor);
52  auto& data_mgr = executor->catalog_->getDataMgr();
53  if (device_type == ExecutorDeviceType::GPU) {
54  gpu_allocator_ = std::make_unique<CudaAllocator>(&data_mgr, device_id);
55  }
56 
57  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
58  ? render_info->render_allocator_map_ptr.get()
59  : nullptr;
60  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
62  device_id,
63  device_type,
64  dispatch_mode,
65  output_columnar,
67  num_rows,
68  col_buffers,
69  frag_offsets,
70  render_allocator_map,
71  render_info,
72  row_set_mem_owner,
73  gpu_allocator_.get(),
74  thread_idx,
75  executor);
76 }
77 
79  const size_t i) const {
81  const auto& result_set = query_buffers_->getResultSet(i);
82  auto deinterleaved_query_mem_desc =
84  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
85  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
86 
87  auto deinterleaved_result_set =
88  std::make_shared<ResultSet>(result_set->getTargetInfos(),
89  std::vector<ColumnLazyFetchInfo>{},
90  std::vector<std::vector<const int8_t*>>{},
91  std::vector<std::vector<int64_t>>{},
92  std::vector<int64_t>{},
94  -1,
95  deinterleaved_query_mem_desc,
97  executor_->getCatalog(),
98  executor_->blockSize(),
99  executor_->gridSize());
100  auto deinterleaved_storage =
101  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
102  auto deinterleaved_buffer =
103  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
104  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
105  size_t deinterleaved_buffer_idx = 0;
106  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
107  auto do_work = [&](const size_t bin_base_off) {
108  std::vector<int64_t> agg_vals(agg_col_count, 0);
109  memcpy(&agg_vals[0],
110  &executor_->plan_state_->init_agg_vals_[0],
111  agg_col_count * sizeof(agg_vals[0]));
112  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
113  executor_->warpSize(),
114  false,
115  true,
116  agg_vals,
118  result_set->getTargetInfos(),
119  executor_->plan_state_->init_agg_vals_);
120  for (size_t agg_idx = 0; agg_idx < agg_col_count;
121  ++agg_idx, ++deinterleaved_buffer_idx) {
122  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
123  }
124  };
126  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
127  bin_idx < result_set->entryCount();
128  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
129  if (UNLIKELY((bin_idx & 0xFFFF) == 0 && check_interrupt())) {
130  throw std::runtime_error(
131  "Query execution has interrupted during result set reduction");
132  }
133  do_work(bin_base_off);
134  }
135  } else {
136  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
137  bin_idx < result_set->entryCount();
138  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
139  do_work(bin_base_off);
140  }
141  }
142  query_buffers_->resetResultSet(i);
143  return deinterleaved_result_set;
144 }
145 
146 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
148  return query_buffers_->getAggInitValForIndex(index);
149 }
150 
152  const RelAlgExecutionUnit& ra_exe_unit,
153  const QueryMemoryDescriptor& query_mem_desc) const {
154  auto timer = DEBUG_TIMER(__func__);
155  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
157  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
159  CHECK_EQ(size_t(1), group_by_buffers_size);
160  return groupBufferToResults(0);
161  }
162  size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
163  for (size_t i = 0; i < group_by_buffers_size; i += step) {
164  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
165  }
167  return executor_->reduceMultiDeviceResults(
168  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
169 }
170 
174  }
175  return query_buffers_->getResultSetOwned(i);
176 }
177 
178 #ifdef HAVE_CUDA
179 namespace {
180 
181 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
182  // Check overflow / division by zero / interrupt first
183  for (const auto err : error_codes) {
184  if (err > 0) {
185  return err;
186  }
187  }
188  for (const auto err : error_codes) {
189  if (err) {
190  return err;
191  }
192  }
193  return 0;
194 }
195 
196 } // namespace
197 #endif
198 
200  const RelAlgExecutionUnit& ra_exe_unit,
201  const GpuCompilationContext* cu_functions,
202  const bool hoist_literals,
203  const std::vector<int8_t>& literal_buff,
204  std::vector<std::vector<const int8_t*>> col_buffers,
205  const std::vector<std::vector<int64_t>>& num_rows,
206  const std::vector<std::vector<uint64_t>>& frag_offsets,
207  const int32_t scan_limit,
208  Data_Namespace::DataMgr* data_mgr,
209  const unsigned block_size_x,
210  const unsigned grid_size_x,
211  const int device_id,
212  const size_t shared_memory_size,
213  int32_t* error_code,
214  const uint32_t num_tables,
215  const bool allow_runtime_interrupt,
216  const std::vector<int64_t>& join_hash_tables,
217  RenderAllocatorMap* render_allocator_map) {
218  auto timer = DEBUG_TIMER(__func__);
219  INJECT_TIMER(lauchGpuCode);
220 #ifdef HAVE_CUDA
223  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
224 
225  bool is_group_by{query_mem_desc_.isGroupBy()};
226 
227  RenderAllocator* render_allocator = nullptr;
228  if (render_allocator_map) {
229  render_allocator = render_allocator_map->getRenderAllocator(device_id);
230  }
231 
232  CHECK(cu_functions);
233  const auto native_code = cu_functions->getNativeCode(device_id);
234  auto cu_func = static_cast<CUfunction>(native_code.first);
235  std::vector<int64_t*> out_vec;
236  uint32_t num_fragments = col_buffers.size();
237  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
238 
239  CUevent start0, stop0; // preparation
240  cuEventCreate(&start0, 0);
241  cuEventCreate(&stop0, 0);
242  CUevent start1, stop1; // cuLaunchKernel
243  cuEventCreate(&start1, 0);
244  cuEventCreate(&stop1, 0);
245  CUevent start2, stop2; // finish
246  cuEventCreate(&start2, 0);
247  cuEventCreate(&stop2, 0);
248 
249  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
250  cuEventRecord(start0, 0);
251  }
252 
254  initializeDynamicWatchdog(native_code.second, device_id);
255  }
256 
257  if (allow_runtime_interrupt && !render_allocator) {
258  initializeRuntimeInterrupter(native_code.second, device_id);
259  }
260 
261  auto kernel_params = prepareKernelParams(col_buffers,
262  literal_buff,
263  num_rows,
264  frag_offsets,
265  scan_limit,
266  init_agg_vals,
267  error_codes,
268  num_tables,
269  join_hash_tables,
270  data_mgr,
271  device_id,
272  hoist_literals,
273  is_group_by);
274 
275  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
276  CHECK_EQ(CUdeviceptr(0), kernel_params[GROUPBY_BUF]);
277 
278  const unsigned block_size_y = 1;
279  const unsigned block_size_z = 1;
280  const unsigned grid_size_y = 1;
281  const unsigned grid_size_z = 1;
282  const auto total_thread_count = block_size_x * grid_size_x;
283  const auto err_desc = kernel_params[ERROR_CODE];
284 
285  if (is_group_by) {
286  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
287  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
288  auto gpu_group_by_buffers =
289  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
291  kernel_params[INIT_AGG_VALS],
292  device_id,
294  block_size_x,
295  grid_size_x,
296  executor_->warpSize(),
297  can_sort_on_gpu,
299  render_allocator);
300  if (ra_exe_unit.use_bump_allocator) {
301  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
302  copy_to_gpu(data_mgr,
303  kernel_params[MAX_MATCHED],
304  &max_matched,
305  sizeof(max_matched),
306  device_id);
307  }
308 
309  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.first;
310  std::vector<void*> param_ptrs;
311  for (auto& param : kernel_params) {
312  param_ptrs.push_back(&param);
313  }
314 
315  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
316  cuEventRecord(stop0, 0);
317  cuEventSynchronize(stop0);
318  float milliseconds0 = 0;
319  cuEventElapsedTime(&milliseconds0, start0, stop0);
320  VLOG(1) << "Device " << std::to_string(device_id)
321  << ": launchGpuCode: group-by prepare: " << std::to_string(milliseconds0)
322  << " ms";
323  cuEventRecord(start1, 0);
324  }
325 
326  if (hoist_literals) {
327  checkCudaErrors(cuLaunchKernel(cu_func,
328  grid_size_x,
329  grid_size_y,
330  grid_size_z,
331  block_size_x,
332  block_size_y,
333  block_size_z,
334  shared_memory_size,
335  nullptr,
336  &param_ptrs[0],
337  nullptr));
338  } else {
339  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
340  checkCudaErrors(cuLaunchKernel(cu_func,
341  grid_size_x,
342  grid_size_y,
343  grid_size_z,
344  block_size_x,
345  block_size_y,
346  block_size_z,
347  shared_memory_size,
348  nullptr,
349  &param_ptrs[0],
350  nullptr));
351  }
352  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
353  executor_->registerActiveModule(native_code.second, device_id);
354  cuEventRecord(stop1, 0);
355  cuEventSynchronize(stop1);
356  executor_->unregisterActiveModule(native_code.second, device_id);
357  float milliseconds1 = 0;
358  cuEventElapsedTime(&milliseconds1, start1, stop1);
359  VLOG(1) << "Device " << std::to_string(device_id)
360  << ": launchGpuCode: group-by cuLaunchKernel: "
361  << std::to_string(milliseconds1) << " ms";
362  cuEventRecord(start2, 0);
363  }
364 
365  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
366  reinterpret_cast<int8_t*>(err_desc),
367  error_codes.size() * sizeof(error_codes[0]));
368  *error_code = aggregate_error_codes(error_codes);
369  if (*error_code > 0) {
370  return {};
371  }
372 
373  if (!render_allocator) {
375  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
377  gpu_group_by_buffers,
378  ra_exe_unit,
379  total_thread_count,
380  device_id);
381  } else {
382  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
383  try {
386  gpu_group_by_buffers,
387  data_mgr,
388  device_id);
389  } catch (const std::bad_alloc&) {
390  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
391  }
392  }
396  query_buffers_->compactProjectionBuffersGpu(
398  data_mgr,
399  gpu_group_by_buffers,
401  data_mgr, kernel_params[TOTAL_MATCHED], device_id),
402  device_id);
403  } else {
404  size_t num_allocated_rows{0};
405  if (ra_exe_unit.use_bump_allocator) {
406  num_allocated_rows = get_num_allocated_rows_from_gpu(
407  data_mgr, kernel_params[TOTAL_MATCHED], device_id);
408  // First, check the error code. If we ran out of slots, don't copy data back
409  // into the ResultSet or update ResultSet entry count
410  if (*error_code < 0) {
411  return {};
412  }
413  }
414  query_buffers_->copyGroupByBuffersFromGpu(
415  data_mgr,
417  ra_exe_unit.use_bump_allocator ? num_allocated_rows
419  gpu_group_by_buffers,
420  &ra_exe_unit,
421  block_size_x,
422  grid_size_x,
423  device_id,
424  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
425  if (num_allocated_rows) {
426  CHECK(ra_exe_unit.use_bump_allocator);
427  CHECK(!query_buffers_->result_sets_.empty());
428  query_buffers_->result_sets_.front()->updateStorageEntryCount(
429  num_allocated_rows);
430  }
431  }
432  } else {
433  query_buffers_->copyGroupByBuffersFromGpu(
434  data_mgr,
437  gpu_group_by_buffers,
438  &ra_exe_unit,
439  block_size_x,
440  grid_size_x,
441  device_id,
442  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
443  }
444  }
445  }
446  } else {
447  std::vector<CUdeviceptr> out_vec_dev_buffers;
448  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
449  // by default, non-grouped aggregate queries generate one result per available thread
450  // in the lifetime of (potentially multi-fragment) kernel execution.
451  // We can reduce these intermediate results internally in the device and hence have
452  // only one result per device, if GPU shared memory optimizations are enabled.
453  const auto num_results_per_agg_col =
454  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
455  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
456  if (ra_exe_unit.estimator) {
457  estimator_result_set_.reset(new ResultSet(
458  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
459  out_vec_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(
460  estimator_result_set_->getDeviceEstimatorBuffer()));
461  } else {
462  for (size_t i = 0; i < agg_col_count; ++i) {
463  CUdeviceptr out_vec_dev_buffer =
464  num_fragments ? reinterpret_cast<CUdeviceptr>(
465  gpu_allocator_->alloc(output_buffer_size_per_agg))
466  : 0;
467  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
468  if (shared_memory_size) {
469  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
470  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
471  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
472  output_buffer_size_per_agg);
473  }
474  }
475  }
476  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(CUdeviceptr));
477  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
478  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
479  agg_col_count * sizeof(CUdeviceptr));
480  kernel_params[GROUPBY_BUF] = reinterpret_cast<CUdeviceptr>(out_vec_dev_ptr);
481  std::vector<void*> param_ptrs;
482  for (auto& param : kernel_params) {
483  param_ptrs.push_back(&param);
484  }
485 
486  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
487  cuEventRecord(stop0, 0);
488  cuEventSynchronize(stop0);
489  float milliseconds0 = 0;
490  cuEventElapsedTime(&milliseconds0, start0, stop0);
491  VLOG(1) << "Device " << std::to_string(device_id)
492  << ": launchGpuCode: prepare: " << std::to_string(milliseconds0) << " ms";
493  cuEventRecord(start1, 0);
494  }
495 
496  if (hoist_literals) {
497  checkCudaErrors(cuLaunchKernel(cu_func,
498  grid_size_x,
499  grid_size_y,
500  grid_size_z,
501  block_size_x,
502  block_size_y,
503  block_size_z,
504  shared_memory_size,
505  nullptr,
506  &param_ptrs[0],
507  nullptr));
508  } else {
509  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
510  checkCudaErrors(cuLaunchKernel(cu_func,
511  grid_size_x,
512  grid_size_y,
513  grid_size_z,
514  block_size_x,
515  block_size_y,
516  block_size_z,
517  shared_memory_size,
518  nullptr,
519  &param_ptrs[0],
520  nullptr));
521  }
522 
523  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
524  executor_->registerActiveModule(native_code.second, device_id);
525  cuEventRecord(stop1, 0);
526  cuEventSynchronize(stop1);
527  executor_->unregisterActiveModule(native_code.second, device_id);
528  float milliseconds1 = 0;
529  cuEventElapsedTime(&milliseconds1, start1, stop1);
530  VLOG(1) << "Device " << std::to_string(device_id)
531  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(milliseconds1)
532  << " ms";
533  cuEventRecord(start2, 0);
534  }
535 
536  copy_from_gpu(data_mgr,
537  &error_codes[0],
538  err_desc,
539  error_codes.size() * sizeof(error_codes[0]),
540  device_id);
541  *error_code = aggregate_error_codes(error_codes);
542  if (*error_code > 0) {
543  return {};
544  }
545  if (ra_exe_unit.estimator) {
547  estimator_result_set_->syncEstimatorBuffer();
548  return {};
549  }
550  for (size_t i = 0; i < agg_col_count; ++i) {
551  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
552  copy_from_gpu(data_mgr,
553  host_out_vec,
554  out_vec_dev_buffers[i],
555  output_buffer_size_per_agg,
556  device_id);
557  out_vec.push_back(host_out_vec);
558  }
559  }
560  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
561  if (count_distinct_bitmap_mem) {
562  copy_from_gpu(data_mgr,
563  query_buffers_->getCountDistinctHostPtr(),
564  count_distinct_bitmap_mem,
565  query_buffers_->getCountDistinctBitmapBytes(),
566  device_id);
567  }
568 
569  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
570  cuEventRecord(stop2, 0);
571  cuEventSynchronize(stop2);
572  float milliseconds2 = 0;
573  cuEventElapsedTime(&milliseconds2, start2, stop2);
574  VLOG(1) << "Device " << std::to_string(device_id)
575  << ": launchGpuCode: finish: " << std::to_string(milliseconds2) << " ms";
576  }
577 
578  return out_vec;
579 #else
580  return {};
581 #endif
582 }
583 
585  const RelAlgExecutionUnit& ra_exe_unit,
586  const CpuCompilationContext* native_code,
587  const bool hoist_literals,
588  const std::vector<int8_t>& literal_buff,
589  std::vector<std::vector<const int8_t*>> col_buffers,
590  const std::vector<std::vector<int64_t>>& num_rows,
591  const std::vector<std::vector<uint64_t>>& frag_offsets,
592  const int32_t scan_limit,
593  int32_t* error_code,
594  const uint32_t num_tables,
595  const std::vector<int64_t>& join_hash_tables) {
596  auto timer = DEBUG_TIMER(__func__);
597  INJECT_TIMER(lauchCpuCode);
598 
600  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
601 
602  std::vector<const int8_t**> multifrag_col_buffers;
603  for (auto& col_buffer : col_buffers) {
604  multifrag_col_buffers.push_back(col_buffer.empty() ? nullptr : col_buffer.data());
605  }
606  const int8_t*** multifrag_cols_ptr{
607  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
608  const uint64_t num_fragments =
609  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
610  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
611 
612  const bool is_group_by{query_mem_desc_.isGroupBy()};
613  std::vector<int64_t*> out_vec;
614  if (ra_exe_unit.estimator) {
615  estimator_result_set_.reset(
616  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
617  out_vec.push_back(
618  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
619  } else {
620  if (!is_group_by) {
621  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
622  auto buff = new int64_t[num_out_frags];
623  out_vec.push_back(static_cast<int64_t*>(buff));
624  }
625  }
626  }
627 
628  CHECK_EQ(num_rows.size(), col_buffers.size());
629  std::vector<int64_t> flatened_num_rows;
630  for (auto& nums : num_rows) {
631  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
632  }
633  std::vector<uint64_t> flatened_frag_offsets;
634  for (auto& offsets : frag_offsets) {
635  flatened_frag_offsets.insert(
636  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
637  }
638  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
639  auto num_rows_ptr =
640  rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0];
641  int32_t total_matched_init{0};
642 
643  std::vector<int64_t> cmpt_val_buff;
644  if (is_group_by) {
645  cmpt_val_buff =
647  init_agg_vals,
649  }
650 
651  CHECK(native_code);
652  const int64_t* join_hash_tables_ptr =
653  join_hash_tables.size() == 1
654  ? reinterpret_cast<int64_t*>(join_hash_tables[0])
655  : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr);
656  if (hoist_literals) {
657  using agg_query = void (*)(const int8_t***, // col_buffers
658  const uint64_t*, // num_fragments
659  const int8_t*, // literals
660  const int64_t*, // num_rows
661  const uint64_t*, // frag_row_offsets
662  const int32_t*, // max_matched
663  int32_t*, // total_matched
664  const int64_t*, // init_agg_value
665  int64_t**, // out
666  int32_t*, // error_code
667  const uint32_t*, // num_tables
668  const int64_t*); // join_hash_tables_ptr
669  if (is_group_by) {
670  reinterpret_cast<agg_query>(native_code->func())(
671  multifrag_cols_ptr,
672  &num_fragments,
673  literal_buff.data(),
674  num_rows_ptr,
675  flatened_frag_offsets.data(),
676  &scan_limit,
677  &total_matched_init,
678  cmpt_val_buff.data(),
679  query_buffers_->getGroupByBuffersPtr(),
680  error_code,
681  &num_tables,
682  join_hash_tables_ptr);
683  } else {
684  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
685  &num_fragments,
686  literal_buff.data(),
687  num_rows_ptr,
688  flatened_frag_offsets.data(),
689  &scan_limit,
690  &total_matched_init,
691  init_agg_vals.data(),
692  out_vec.data(),
693  error_code,
694  &num_tables,
695  join_hash_tables_ptr);
696  }
697  } else {
698  using agg_query = void (*)(const int8_t***, // col_buffers
699  const uint64_t*, // num_fragments
700  const int64_t*, // num_rows
701  const uint64_t*, // frag_row_offsets
702  const int32_t*, // max_matched
703  int32_t*, // total_matched
704  const int64_t*, // init_agg_value
705  int64_t**, // out
706  int32_t*, // error_code
707  const uint32_t*, // num_tables
708  const int64_t*); // join_hash_tables_ptr
709  if (is_group_by) {
710  reinterpret_cast<agg_query>(native_code->func())(
711  multifrag_cols_ptr,
712  &num_fragments,
713  num_rows_ptr,
714  flatened_frag_offsets.data(),
715  &scan_limit,
716  &total_matched_init,
717  cmpt_val_buff.data(),
718  query_buffers_->getGroupByBuffersPtr(),
719  error_code,
720  &num_tables,
721  join_hash_tables_ptr);
722  } else {
723  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
724  &num_fragments,
725  num_rows_ptr,
726  flatened_frag_offsets.data(),
727  &scan_limit,
728  &total_matched_init,
729  init_agg_vals.data(),
730  out_vec.data(),
731  error_code,
732  &num_tables,
733  join_hash_tables_ptr);
734  }
735  }
736 
737  if (ra_exe_unit.estimator) {
738  return {};
739  }
740 
741  if (rowid_lookup_num_rows && *error_code < 0) {
742  *error_code = 0;
743  }
744 
746  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
747  }
748 
751  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
752  }
753 
754  return out_vec;
755 }
756 
757 #ifdef HAVE_CUDA
758 void QueryExecutionContext::initializeDynamicWatchdog(void* native_module,
759  const int device_id) const {
760  auto cu_module = static_cast<CUmodule>(native_module);
761  CHECK(cu_module);
762  CUevent start, stop;
763  cuEventCreate(&start, 0);
764  cuEventCreate(&stop, 0);
765  cuEventRecord(start, 0);
766 
768  size_t dw_cycle_budget_size;
769  // Translate milliseconds to device cycles
770  uint64_t cycle_budget = executor_->deviceCycles(g_dynamic_watchdog_time_limit);
771  if (device_id == 0) {
772  LOG(INFO) << "Dynamic Watchdog budget: GPU: "
774  << std::to_string(cycle_budget) << " cycles";
775  }
776  checkCudaErrors(cuModuleGetGlobal(
777  &dw_cycle_budget, &dw_cycle_budget_size, cu_module, "dw_cycle_budget"));
778  CHECK_EQ(dw_cycle_budget_size, sizeof(uint64_t));
779  checkCudaErrors(cuMemcpyHtoD(
780  dw_cycle_budget, reinterpret_cast<void*>(&cycle_budget), sizeof(uint64_t)));
781 
783  size_t dw_sm_cycle_start_size;
784  checkCudaErrors(cuModuleGetGlobal(
785  &dw_sm_cycle_start, &dw_sm_cycle_start_size, cu_module, "dw_sm_cycle_start"));
786  CHECK_EQ(dw_sm_cycle_start_size, 128 * sizeof(uint64_t));
787  checkCudaErrors(cuMemsetD32(dw_sm_cycle_start, 0, 128 * 2));
788 
789  if (!executor_->interrupted_.load()) {
790  // Executor is not marked as interrupted, make sure dynamic watchdog doesn't block
791  // execution
793  size_t dw_abort_size;
794  checkCudaErrors(cuModuleGetGlobal(&dw_abort, &dw_abort_size, cu_module, "dw_abort"));
795  CHECK_EQ(dw_abort_size, sizeof(uint32_t));
796  checkCudaErrors(cuMemsetD32(dw_abort, 0, 1));
797  }
798 
799  cuEventRecord(stop, 0);
800  cuEventSynchronize(stop);
801  float milliseconds = 0;
802  cuEventElapsedTime(&milliseconds, start, stop);
803  VLOG(1) << "Device " << std::to_string(device_id)
804  << ": launchGpuCode: dynamic watchdog init: " << std::to_string(milliseconds)
805  << " ms\n";
806 }
807 
808 void QueryExecutionContext::initializeRuntimeInterrupter(void* native_module,
809  const int device_id) const {
810  if (!executor_->interrupted_.load()) {
811  // Executor is not marked as interrupted, make sure interrupt flag doesn't block
812  // execution
813  auto cu_module = static_cast<CUmodule>(native_module);
814  CHECK(cu_module);
815  CUevent start, stop;
816  cuEventCreate(&start, 0);
817  cuEventCreate(&stop, 0);
818  cuEventRecord(start, 0);
819 
821  size_t runtime_interrupt_flag_size;
822  checkCudaErrors(cuModuleGetGlobal(&runtime_interrupt_flag,
823  &runtime_interrupt_flag_size,
824  cu_module,
825  "runtime_interrupt_flag"));
826  CHECK_EQ(runtime_interrupt_flag_size, sizeof(uint32_t));
827  checkCudaErrors(cuMemsetD32(runtime_interrupt_flag, 0, 1));
828 
829  cuEventRecord(stop, 0);
830  cuEventSynchronize(stop);
831  float milliseconds = 0;
832  cuEventElapsedTime(&milliseconds, start, stop);
833  VLOG(1) << "Device " << std::to_string(device_id)
834  << ": launchGpuCode: runtime query interrupter init: "
835  << std::to_string(milliseconds) << " ms";
836  }
837 }
838 
839 std::vector<CUdeviceptr> QueryExecutionContext::prepareKernelParams(
840  const std::vector<std::vector<const int8_t*>>& col_buffers,
841  const std::vector<int8_t>& literal_buff,
842  const std::vector<std::vector<int64_t>>& num_rows,
843  const std::vector<std::vector<uint64_t>>& frag_offsets,
844  const int32_t scan_limit,
845  const std::vector<int64_t>& init_agg_vals,
846  const std::vector<int32_t>& error_codes,
847  const uint32_t num_tables,
848  const std::vector<int64_t>& join_hash_tables,
849  Data_Namespace::DataMgr* data_mgr,
850  const int device_id,
851  const bool hoist_literals,
852  const bool is_group_by) const {
854  std::vector<CUdeviceptr> params(KERN_PARAM_COUNT, 0);
855  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
856  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
857  if (col_count) {
858  std::vector<CUdeviceptr> multifrag_col_dev_buffers;
859  for (auto frag_col_buffers : col_buffers) {
860  std::vector<CUdeviceptr> col_dev_buffers;
861  for (auto col_buffer : frag_col_buffers) {
862  col_dev_buffers.push_back(reinterpret_cast<CUdeviceptr>(col_buffer));
863  }
864  auto col_buffers_dev_ptr = reinterpret_cast<CUdeviceptr>(
865  gpu_allocator_->alloc(col_count * sizeof(CUdeviceptr)));
866  copy_to_gpu(data_mgr,
867  col_buffers_dev_ptr,
868  &col_dev_buffers[0],
869  col_count * sizeof(CUdeviceptr),
870  device_id);
871  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
872  }
873  params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(
874  gpu_allocator_->alloc(num_fragments * sizeof(CUdeviceptr)));
875  copy_to_gpu(data_mgr,
876  params[COL_BUFFERS],
877  &multifrag_col_dev_buffers[0],
878  num_fragments * sizeof(CUdeviceptr),
879  device_id);
880  }
881  params[NUM_FRAGMENTS] =
882  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint64_t)));
883  copy_to_gpu(
884  data_mgr, params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t), device_id);
885  CUdeviceptr literals_and_addr_mapping = reinterpret_cast<CUdeviceptr>(
886  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t)));
887  CHECK_EQ(CUdeviceptr{0}, literals_and_addr_mapping % 8);
888  std::vector<int64_t> additional_literal_bytes;
889  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
890  if (count_distinct_bitmap_mem) {
891  // Store host and device addresses
892  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
893  CHECK(count_distinct_bitmap_host_mem);
894  additional_literal_bytes.push_back(
895  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
896  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
897  copy_to_gpu(data_mgr,
898  literals_and_addr_mapping,
899  &additional_literal_bytes[0],
900  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]),
901  device_id);
902  }
903  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
904  sizeof(additional_literal_bytes[0]);
905  if (!literal_buff.empty()) {
906  CHECK(hoist_literals);
907  copy_to_gpu(
908  data_mgr, params[LITERALS], &literal_buff[0], literal_buff.size(), device_id);
909  }
910  CHECK_EQ(num_rows.size(), col_buffers.size());
911  std::vector<int64_t> flatened_num_rows;
912  for (auto& nums : num_rows) {
913  CHECK_EQ(nums.size(), num_tables);
914  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
915  }
916  params[NUM_ROWS] = reinterpret_cast<CUdeviceptr>(
917  gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size()));
918  copy_to_gpu(data_mgr,
919  params[NUM_ROWS],
920  &flatened_num_rows[0],
921  sizeof(int64_t) * flatened_num_rows.size(),
922  device_id);
923 
924  CHECK_EQ(frag_offsets.size(), col_buffers.size());
925  std::vector<int64_t> flatened_frag_offsets;
926  for (auto& offsets : frag_offsets) {
927  CHECK_EQ(offsets.size(), num_tables);
928  flatened_frag_offsets.insert(
929  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
930  }
931  params[FRAG_ROW_OFFSETS] = reinterpret_cast<CUdeviceptr>(
932  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size()));
933  copy_to_gpu(data_mgr,
934  params[FRAG_ROW_OFFSETS],
935  &flatened_frag_offsets[0],
936  sizeof(int64_t) * flatened_frag_offsets.size(),
937  device_id);
938 
939  // Note that this will be overwritten if we are setting the entry count during group by
940  // buffer allocation and initialization
941  const int32_t max_matched{scan_limit};
942  params[MAX_MATCHED] =
943  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(max_matched)));
944  copy_to_gpu(
945  data_mgr, params[MAX_MATCHED], &max_matched, sizeof(max_matched), device_id);
946 
947  int32_t total_matched{0};
948  params[TOTAL_MATCHED] =
949  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(total_matched)));
950  copy_to_gpu(
951  data_mgr, params[TOTAL_MATCHED], &total_matched, sizeof(total_matched), device_id);
952 
953  if (is_group_by && !output_columnar_) {
954  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
955  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
956  params[INIT_AGG_VALS] =
957  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t)));
958  copy_to_gpu(data_mgr,
959  params[INIT_AGG_VALS],
960  &cmpt_val_buff[0],
961  cmpt_sz * sizeof(int64_t),
962  device_id);
963  } else {
964  params[INIT_AGG_VALS] = reinterpret_cast<CUdeviceptr>(
965  gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t)));
966  copy_to_gpu(data_mgr,
967  params[INIT_AGG_VALS],
968  &init_agg_vals[0],
969  init_agg_vals.size() * sizeof(int64_t),
970  device_id);
971  }
972 
973  params[ERROR_CODE] = reinterpret_cast<CUdeviceptr>(
974  gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0])));
975  copy_to_gpu(data_mgr,
976  params[ERROR_CODE],
977  &error_codes[0],
978  error_codes.size() * sizeof(error_codes[0]),
979  device_id);
980 
981  params[NUM_TABLES] =
982  reinterpret_cast<CUdeviceptr>(gpu_allocator_->alloc(sizeof(uint32_t)));
983  copy_to_gpu(data_mgr, params[NUM_TABLES], &num_tables, sizeof(uint32_t), device_id);
984 
985  const auto hash_table_count = join_hash_tables.size();
986  switch (hash_table_count) {
987  case 0: {
988  params[JOIN_HASH_TABLES] = CUdeviceptr(0);
989  break;
990  }
991  case 1:
992  params[JOIN_HASH_TABLES] = static_cast<CUdeviceptr>(join_hash_tables[0]);
993  break;
994  default: {
995  params[JOIN_HASH_TABLES] = reinterpret_cast<CUdeviceptr>(
996  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t)));
997  copy_to_gpu(data_mgr,
998  params[JOIN_HASH_TABLES],
999  &join_hash_tables[0],
1000  hash_table_count * sizeof(int64_t),
1001  device_id);
1002  break;
1003  }
1004  }
1005 
1006  return params;
1007 }
1008 #endif
#define CHECK_EQ(x, y)
Definition: Logger.h:211
RenderAllocator * getRenderAllocator(size_t device_id)
ExecutorDeviceType
__device__ int64_t dw_sm_cycle_start[128]
Definition: cuda_mapd_rt.cu:92
Streaming Top N algorithm.
#define LOG(tag)
Definition: Logger.h:194
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
const std::list< Analyzer::OrderEntry > order_entries
unsigned long long CUdeviceptr
Definition: nocuda.h:27
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
__device__ int64_t dw_cycle_budget
Definition: cuda_mapd_rt.cu:94
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
std::pair< void *, void * > getNativeCode(const size_t device_id) const
Definition: NvidiaKernel.h:81
const ExecutorDeviceType device_type_
std::string to_string(char const *&&v)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
__device__ bool check_interrupt()
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const SortInfo sort_info
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
void * CUfunction
Definition: nocuda.h:24
int64_t getAggInitValForIndex(const size_t index) const
__device__ int32_t runtime_interrupt_flag
Definition: cuda_mapd_rt.cu:96
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
#define UNLIKELY(x)
Definition: likely.h:25
size_t get_num_allocated_rows_from_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr projection_size_gpu, const int device_id)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:32
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:486
std::unique_ptr< CudaAllocator > gpu_allocator_
bool interleavedBins(const ExecutorDeviceType) const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int64_t > &join_hash_tables)
Basic constructors and methods of the row set interface.
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const GpuCompilationContext *cu_functions, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int64_t > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:80
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
__device__ int32_t dw_abort
Definition: cuda_mapd_rt.cu:95
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:297
void * CUmodule
Definition: nocuda.h:23