OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "DeviceKernel.h"
21 #include "Execute.h"
22 #include "GpuInitGroups.h"
23 #include "InPlaceSort.h"
26 #include "QueryMemoryInitializer.h"
27 #include "RelAlgExecutionUnit.h"
28 #include "ResultSet.h"
29 #include "Shared/likely.h"
30 #include "SpeculativeTopN.h"
31 #include "StreamingTopN.h"
32 
34  const RelAlgExecutionUnit& ra_exe_unit,
36  const Executor* executor,
37  const ExecutorDeviceType device_type,
38  const ExecutorDispatchMode dispatch_mode,
39  const int device_id,
40  const shared::TableKey& outer_table_key,
41  const int64_t num_rows,
42  const std::vector<std::vector<const int8_t*>>& col_buffers,
43  const std::vector<std::vector<uint64_t>>& frag_offsets,
44  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
45  const bool output_columnar,
46  const bool sort_on_gpu,
47  const size_t thread_idx,
48  RenderInfo* render_info)
49  : query_mem_desc_(query_mem_desc)
50  , executor_(executor)
51  , device_type_(device_type)
52  , dispatch_mode_(dispatch_mode)
53  , row_set_mem_owner_(row_set_mem_owner)
54  , output_columnar_(output_columnar) {
55  CHECK(executor);
56  auto data_mgr = executor->getDataMgr();
57  if (device_type == ExecutorDeviceType::GPU) {
58  gpu_allocator_ = std::make_unique<CudaAllocator>(
59  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
60  }
61 
62  auto render_allocator_map = render_info && render_info->isInSitu()
63  ? render_info->render_allocator_map_ptr.get()
64  : nullptr;
65  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
67  device_id,
68  device_type,
69  dispatch_mode,
70  output_columnar,
72  outer_table_key,
73  num_rows,
74  col_buffers,
75  frag_offsets,
76  render_allocator_map,
77  render_info,
78  row_set_mem_owner,
79  gpu_allocator_.get(),
80  thread_idx,
81  executor);
82 }
83 
85  const size_t i) const {
87  const auto& result_set = query_buffers_->getResultSet(i);
88  auto deinterleaved_query_mem_desc =
90  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
91  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
92 
93  auto deinterleaved_result_set =
94  std::make_shared<ResultSet>(result_set->getTargetInfos(),
95  std::vector<ColumnLazyFetchInfo>{},
96  std::vector<std::vector<const int8_t*>>{},
97  std::vector<std::vector<int64_t>>{},
98  std::vector<int64_t>{},
100  -1,
101  -1,
102  deinterleaved_query_mem_desc,
104  executor_->blockSize(),
105  executor_->gridSize());
106  auto deinterleaved_storage =
107  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
108  auto deinterleaved_buffer =
109  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
110  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
111  size_t deinterleaved_buffer_idx = 0;
112  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
113  auto do_work = [&](const size_t bin_base_off) {
114  std::vector<int64_t> agg_vals(agg_col_count, 0);
115  memcpy(&agg_vals[0],
116  &executor_->plan_state_->init_agg_vals_[0],
117  agg_col_count * sizeof(agg_vals[0]));
118  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
119  executor_->warpSize(),
120  false,
121  true,
122  agg_vals,
124  result_set->getTargetInfos(),
125  executor_->plan_state_->init_agg_vals_);
126  for (size_t agg_idx = 0; agg_idx < agg_col_count;
127  ++agg_idx, ++deinterleaved_buffer_idx) {
128  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
129  }
130  };
132  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
133  bin_idx < result_set->entryCount();
134  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
135  if (UNLIKELY((bin_idx & 0xFFFF) == 0 &&
136  executor_->checkNonKernelTimeInterrupted())) {
137  throw std::runtime_error(
138  "Query execution has interrupted during result set reduction");
139  }
140  do_work(bin_base_off);
141  }
142  } else {
143  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
144  bin_idx < result_set->entryCount();
145  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
146  do_work(bin_base_off);
147  }
148  }
149  query_buffers_->resetResultSet(i);
150  return deinterleaved_result_set;
151 }
152 
153 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
155  return query_buffers_->getAggInitValForIndex(index);
156 }
157 
159  const RelAlgExecutionUnit& ra_exe_unit,
160  const QueryMemoryDescriptor& query_mem_desc) const {
161  auto timer = DEBUG_TIMER(__func__);
162  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
164  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
166  const size_t expected_num_buffers = query_mem_desc.hasVarlenOutput() ? 2 : 1;
167  CHECK_EQ(expected_num_buffers, group_by_buffers_size);
168  return groupBufferToResults(0);
169  }
170  const size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
171  const size_t group_by_output_buffers_size =
172  group_by_buffers_size - (query_mem_desc.hasVarlenOutput() ? 1 : 0);
173  for (size_t i = 0; i < group_by_output_buffers_size; i += step) {
174  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
175  }
177  return executor_->reduceMultiDeviceResults(
178  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
179 }
180 
184  }
185  return query_buffers_->getResultSetOwned(i);
186 }
187 
188 namespace {
189 
190 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
191  // Check overflow / division by zero / interrupt first
192  for (const auto err : error_codes) {
193  if (err > 0) {
194  return err;
195  }
196  }
197  for (const auto err : error_codes) {
198  if (err) {
199  return err;
200  }
201  }
202  return 0;
203 }
204 
205 } // namespace
206 
208  const RelAlgExecutionUnit& ra_exe_unit,
209  const CompilationContext* compilation_context,
210  const bool hoist_literals,
211  const std::vector<int8_t>& literal_buff,
212  std::vector<std::vector<const int8_t*>> col_buffers,
213  const std::vector<std::vector<int64_t>>& num_rows,
214  const std::vector<std::vector<uint64_t>>& frag_offsets,
215  const int32_t scan_limit,
216  Data_Namespace::DataMgr* data_mgr,
217  const unsigned block_size_x,
218  const unsigned grid_size_x,
219  const int device_id,
220  const size_t shared_memory_size,
221  int32_t* error_code,
222  const uint32_t num_tables,
223  const bool allow_runtime_interrupt,
224  const std::vector<int8_t*>& join_hash_tables,
225  RenderAllocatorMap* render_allocator_map,
226  bool optimize_cuda_block_and_grid_sizes) {
227  auto timer = DEBUG_TIMER(__func__);
228  INJECT_TIMER(lauchGpuCode);
231  CHECK(compilation_context);
232  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
233 
234  bool is_group_by{query_mem_desc_.isGroupBy()};
235 
236  RenderAllocator* render_allocator = nullptr;
237  if (render_allocator_map) {
238  render_allocator = render_allocator_map->getRenderAllocator(device_id);
239  }
240 
241  auto kernel = create_device_kernel(compilation_context, device_id);
242 
243  std::vector<int64_t*> out_vec;
244  uint32_t num_fragments = col_buffers.size();
245  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
246 
247  auto prepareClock = kernel->make_clock();
248  auto launchClock = kernel->make_clock();
249  auto finishClock = kernel->make_clock();
250 
251  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
252  prepareClock->start();
253  }
254 
256  kernel->initializeDynamicWatchdog(
257  executor_->interrupted_.load(),
259  }
260 
261  if (allow_runtime_interrupt && !render_allocator) {
262  kernel->initializeRuntimeInterrupter(device_id);
263  }
264 
265  auto kernel_params = prepareKernelParams(col_buffers,
266  literal_buff,
267  num_rows,
268  frag_offsets,
269  scan_limit,
270  init_agg_vals,
271  error_codes,
272  num_tables,
273  join_hash_tables,
274  data_mgr,
275  device_id,
276  hoist_literals,
277  is_group_by);
278 
279  static_assert(size_t(KERN_PARAM_COUNT) == kernel_params.size());
280  CHECK(!kernel_params[GROUPBY_BUF]);
281 
282  const unsigned block_size_y = 1;
283  const unsigned block_size_z = 1;
284  const unsigned grid_size_y = 1;
285  const unsigned grid_size_z = 1;
286  const auto total_thread_count = block_size_x * grid_size_x;
287  const auto err_desc = kernel_params[ERROR_CODE];
288  if (is_group_by) {
289  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
290  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
291  auto gpu_group_by_buffers =
292  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
294  kernel_params[INIT_AGG_VALS],
295  device_id,
297  block_size_x,
298  grid_size_x,
299  executor_->warpSize(),
300  can_sort_on_gpu,
302  render_allocator);
303  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
304  gpu_allocator_->copyToDevice(
305  kernel_params[MAX_MATCHED], &max_matched, sizeof(max_matched));
306 
307  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.ptrs;
308  std::vector<void*> param_ptrs;
309  for (auto& param : kernel_params) {
310  param_ptrs.push_back(&param);
311  }
312 
313  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
314  auto prepareTime = prepareClock->stop();
315  VLOG(1) << "Device " << std::to_string(device_id)
316  << ": launchGpuCode: group-by prepare: " << std::to_string(prepareTime)
317  << " ms";
318  launchClock->start();
319  }
320 
321  if (hoist_literals) {
322  VLOG(1) << "Launching(" << kernel->name() << ") on device_id(" << device_id << ')';
323  kernel->launch(grid_size_x,
324  grid_size_y,
325  grid_size_z,
326  block_size_x,
327  block_size_y,
328  block_size_z,
329  shared_memory_size,
330  &param_ptrs[0],
331  optimize_cuda_block_and_grid_sizes);
332  } else {
333  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
334  VLOG(1) << "Launching(" << kernel->name() << ") on device_id(" << device_id << ')';
335  kernel->launch(grid_size_x,
336  grid_size_y,
337  grid_size_z,
338  block_size_x,
339  block_size_y,
340  block_size_z,
341  shared_memory_size,
342  &param_ptrs[0],
343  optimize_cuda_block_and_grid_sizes);
344  }
345  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
346  auto launchTime = launchClock->stop();
347  VLOG(1) << "Device " << std::to_string(device_id)
348  << ": launchGpuCode: group-by cuLaunchKernel: "
349  << std::to_string(launchTime) << " ms";
350  finishClock->start();
351  }
352 
353  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
354  reinterpret_cast<int8_t*>(err_desc),
355  error_codes.size() * sizeof(error_codes[0]));
356  *error_code = aggregate_error_codes(error_codes);
357  if (*error_code > 0) {
358  return {};
359  }
360 
361  if (!render_allocator) {
363  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
365  gpu_group_by_buffers,
366  ra_exe_unit,
367  total_thread_count,
368  device_id);
369  } else {
370  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
371  try {
374  gpu_group_by_buffers,
375  data_mgr,
376  device_id);
377  } catch (const std::bad_alloc&) {
378  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
379  }
380  }
384  query_buffers_->compactProjectionBuffersGpu(
386  data_mgr,
387  gpu_group_by_buffers,
389  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id),
390  device_id);
391  } else {
392  size_t num_allocated_rows{0};
393  if (ra_exe_unit.use_bump_allocator) {
394  num_allocated_rows = get_num_allocated_rows_from_gpu(
395  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id);
396  // First, check the error code. If we ran out of slots, don't copy data back
397  // into the ResultSet or update ResultSet entry count
398  if (*error_code < 0) {
399  return {};
400  }
401  }
402  query_buffers_->copyGroupByBuffersFromGpu(
405  ra_exe_unit.use_bump_allocator ? num_allocated_rows
407  gpu_group_by_buffers,
408  &ra_exe_unit,
409  block_size_x,
410  grid_size_x,
411  device_id,
412  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
413  if (num_allocated_rows) {
414  CHECK(ra_exe_unit.use_bump_allocator);
415  CHECK(!query_buffers_->result_sets_.empty());
416  query_buffers_->result_sets_.front()->updateStorageEntryCount(
417  num_allocated_rows);
418  }
419  }
420  } else {
421  query_buffers_->copyGroupByBuffersFromGpu(
425  gpu_group_by_buffers,
426  &ra_exe_unit,
427  block_size_x,
428  grid_size_x,
429  device_id,
430  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
431  }
432  }
433  }
434  } else {
435  std::vector<int8_t*> out_vec_dev_buffers;
436  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
437  // by default, non-grouped aggregate queries generate one result per available thread
438  // in the lifetime of (potentially multi-fragment) kernel execution.
439  // We can reduce these intermediate results internally in the device and hence have
440  // only one result per device, if GPU shared memory optimizations are enabled.
441  const auto num_results_per_agg_col =
442  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
443  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
444  if (ra_exe_unit.estimator) {
445  estimator_result_set_.reset(new ResultSet(
446  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
447  out_vec_dev_buffers.push_back(estimator_result_set_->getDeviceEstimatorBuffer());
448  } else {
449  for (size_t i = 0; i < agg_col_count; ++i) {
450  int8_t* out_vec_dev_buffer =
451  num_fragments ? gpu_allocator_->alloc(output_buffer_size_per_agg) : nullptr;
452  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
453  if (shared_memory_size) {
454  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
455  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
456  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
457  output_buffer_size_per_agg);
458  }
459  }
460  }
461  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(int8_t*));
462  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
463  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
464  agg_col_count * sizeof(int8_t*));
465  kernel_params[GROUPBY_BUF] = out_vec_dev_ptr;
466  std::vector<void*> param_ptrs;
467  for (auto& param : kernel_params) {
468  param_ptrs.push_back(&param);
469  }
470 
471  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
472  auto prepareTime = prepareClock->stop();
473 
474  VLOG(1) << "Device " << std::to_string(device_id)
475  << ": launchGpuCode: prepare: " << std::to_string(prepareTime) << " ms";
476  launchClock->start();
477  }
478 
479  if (hoist_literals) {
480  VLOG(1) << "Launching(" << kernel->name() << ") on device_id(" << device_id << ')';
481  kernel->launch(grid_size_x,
482  grid_size_y,
483  grid_size_z,
484  block_size_x,
485  block_size_y,
486  block_size_z,
487  shared_memory_size,
488  &param_ptrs[0],
489  optimize_cuda_block_and_grid_sizes);
490  } else {
491  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
492  VLOG(1) << "Launching(" << kernel->name() << ") on device_id(" << device_id << ')';
493  kernel->launch(grid_size_x,
494  grid_size_y,
495  grid_size_z,
496  block_size_x,
497  block_size_y,
498  block_size_z,
499  shared_memory_size,
500  &param_ptrs[0],
501  optimize_cuda_block_and_grid_sizes);
502  }
503 
504  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
505  auto launchTime = launchClock->stop();
506  VLOG(1) << "Device " << std::to_string(device_id)
507  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(launchTime)
508  << " ms";
509  finishClock->start();
510  }
511 
512  gpu_allocator_->copyFromDevice(
513  &error_codes[0], err_desc, error_codes.size() * sizeof(error_codes[0]));
514  *error_code = aggregate_error_codes(error_codes);
515  if (*error_code > 0) {
516  return {};
517  }
518  if (ra_exe_unit.estimator) {
520  estimator_result_set_->syncEstimatorBuffer();
521  return {};
522  }
523  for (size_t i = 0; i < agg_col_count; ++i) {
524  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
525  gpu_allocator_->copyFromDevice(
526  host_out_vec, out_vec_dev_buffers[i], output_buffer_size_per_agg);
527  out_vec.push_back(host_out_vec);
528  }
529  }
530  const auto count_distinct_bitmap_device_mem =
531  query_buffers_->getCountDistinctBitmapDevicePtr();
532  if (count_distinct_bitmap_device_mem) {
533  gpu_allocator_->copyFromDevice(
534  query_buffers_->getCountDistinctBitmapHostPtr(),
535  reinterpret_cast<void*>(count_distinct_bitmap_device_mem),
536  query_buffers_->getCountDistinctBitmapBytes());
537  }
538 
539  const auto varlen_output_gpu_buf = query_buffers_->getVarlenOutputPtr();
540  if (varlen_output_gpu_buf) {
542  const size_t varlen_output_buf_bytes =
545  CHECK(query_buffers_->getVarlenOutputHostPtr());
546  gpu_allocator_->copyFromDevice(query_buffers_->getVarlenOutputHostPtr(),
547  reinterpret_cast<void*>(varlen_output_gpu_buf),
548  varlen_output_buf_bytes);
549  }
550 
551  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
552  if (allow_runtime_interrupt) {
553  kernel->resetRuntimeInterrupter(device_id);
554  }
555  auto finishTime = finishClock->stop();
556  VLOG(1) << "Device " << std::to_string(device_id)
557  << ": launchGpuCode: finish: " << std::to_string(finishTime) << " ms";
558  }
559 
560  return out_vec;
561 }
562 
564  const RelAlgExecutionUnit& ra_exe_unit,
565  const CpuCompilationContext* native_code,
566  const bool hoist_literals,
567  const std::vector<int8_t>& literal_buff,
568  std::vector<std::vector<const int8_t*>> col_buffers,
569  const std::vector<std::vector<int64_t>>& num_rows,
570  const std::vector<std::vector<uint64_t>>& frag_offsets,
571  const int32_t scan_limit,
572  int32_t* error_code,
573  const uint32_t start_rowid,
574  const uint32_t num_tables,
575  const std::vector<int8_t*>& join_hash_tables,
576  const int64_t num_rows_to_process) {
577  auto timer = DEBUG_TIMER(__func__);
578  INJECT_TIMER(lauchCpuCode);
579 
581  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
582 
583  std::vector<const int8_t**> multifrag_col_buffers;
584  for (auto& col_buffer : col_buffers) {
585  multifrag_col_buffers.push_back(col_buffer.empty() ? nullptr : col_buffer.data());
586  }
587  const int8_t*** multifrag_cols_ptr{
588  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
589  const uint32_t num_fragments =
590  multifrag_cols_ptr ? static_cast<uint32_t>(col_buffers.size()) : uint32_t(0);
591  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint32_t(0);
592 
593  const bool is_group_by{query_mem_desc_.isGroupBy()};
594  std::vector<int64_t*> out_vec;
595  if (ra_exe_unit.estimator) {
596  // Subfragments collect the result from multiple runs in a single
597  // result set.
598  if (!estimator_result_set_) {
599  estimator_result_set_.reset(
600  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
601  }
602  out_vec.push_back(
603  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
604  } else {
605  if (!is_group_by) {
606  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
607  auto buff = new int64_t[num_out_frags];
608  out_vec.push_back(static_cast<int64_t*>(buff));
609  }
610  }
611  }
612 
613  CHECK_EQ(num_rows.size(), col_buffers.size());
614  std::vector<int64_t> flatened_num_rows;
615  for (auto& nums : num_rows) {
616  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
617  }
618  std::vector<uint64_t> flatened_frag_offsets;
619  for (auto& offsets : frag_offsets) {
620  flatened_frag_offsets.insert(
621  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
622  }
623  const int64_t rowid_lookup_num_rows =
624  start_rowid ? static_cast<int64_t>(start_rowid) + 1 : 0;
625  int64_t const* num_rows_ptr;
626  if (num_rows_to_process > 0) {
627  flatened_num_rows[0] = num_rows_to_process;
628  num_rows_ptr = flatened_num_rows.data();
629  } else {
630  num_rows_ptr =
631  rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data();
632  }
633  int32_t total_matched_init{0};
634 
635  std::vector<int64_t> cmpt_val_buff;
636  if (is_group_by) {
637  cmpt_val_buff =
639  init_agg_vals,
641  }
642 
643  RowFunctionManager mgr(executor_, ra_exe_unit);
644  int8_t* row_func_mgr_ptr = reinterpret_cast<int8_t*>(&mgr);
645 
646  CHECK(native_code);
647  const int64_t* join_hash_tables_ptr =
648  join_hash_tables.size() == 1
649  ? reinterpret_cast<const int64_t*>(join_hash_tables[0])
650  : (join_hash_tables.size() > 1
651  ? reinterpret_cast<const int64_t*>(&join_hash_tables[0])
652  : nullptr);
653  VLOG(1) << "Calling " << native_code->name() << " hoist_literals(" << hoist_literals
654  << ')';
655  const int64_t* const init_agg_value =
656  is_group_by ? cmpt_val_buff.data() : init_agg_vals.data();
657  int64_t** const out =
658  is_group_by ? query_buffers_->getGroupByBuffersPtr() : out_vec.data();
659  if (hoist_literals) {
660  native_code->call(
661  error_code, // int32_t*, // error_code
662  &total_matched_init, // int32_t*, // total_matched
663  out, // int64_t**, // out
664  &num_fragments, // const uint32_t*, // num_fragments
665  &num_tables, // const uint32_t*, // num_tables
666  &start_rowid, // const uint32_t*, // start_rowid aka row_index_resume
667  multifrag_cols_ptr, // const int8_t***, // col_buffers
668  literal_buff.data(), // const int8_t*, // literals
669  num_rows_ptr, // const int64_t*, // num_rows
670  flatened_frag_offsets.data(), // const uint64_t*, // frag_row_offsets
671  &scan_limit, // const int32_t*, // max_matched
672  init_agg_value, // const int64_t*, // init_agg_value
673  join_hash_tables_ptr, // const int64_t*, // join_hash_tables_ptr
674  row_func_mgr_ptr); // const int8_t*); // row_func_mgr
675  } else {
676  native_code->call(
677  error_code, // int32_t*, // error_code
678  &total_matched_init, // int32_t*, // total_matched
679  out, // int64_t**, // out
680  &num_fragments, // const uint32_t*, // num_fragments
681  &num_tables, // const uint32_t*, // num_tables
682  &start_rowid, // const uint32_t*, // start_rowid aka row_index_resume
683  multifrag_cols_ptr, // const int8_t***, // col_buffers
684  num_rows_ptr, // const int64_t*, // num_rows
685  flatened_frag_offsets.data(), // const uint64_t*, // frag_row_offsets
686  &scan_limit, // const int32_t*, // max_matched
687  init_agg_value, // const int64_t*, // init_agg_value
688  join_hash_tables_ptr, // const int64_t*, // join_hash_tables_ptr
689  row_func_mgr_ptr); // const int8_t*); // row_func_mgr
690  }
691 
692  if (ra_exe_unit.estimator) {
693  return {};
694  }
695 
696  if (rowid_lookup_num_rows && *error_code < 0) {
697  *error_code = 0;
698  }
699 
701  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
702  }
703 
706  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
707  }
708  return out_vec;
709 }
710 
712  std::vector<std::vector<int8_t const*>> const& col_buffers) const {
713  if (size_t const num_fragments = col_buffers.size()) {
714  size_t const col_bytes = col_buffers.front().size() * sizeof(int8_t const*);
715  // num_fragments pointers, each pointing to a chunk of size col_bytes
716  return num_fragments * sizeof(int8_t*) + num_fragments * col_bytes;
717  }
718  return 0;
719 }
720 // Assumes all vectors in col_buffers are the same size.
721 // Copy 2d vector to device without flattening.
723  int8_t* device_ptr,
724  std::vector<std::vector<int8_t const*>> const& col_buffers) const {
725  if (size_t const num_fragments = col_buffers.size()) {
726  size_t const col_bytes = col_buffers.front().size() * sizeof(int8_t const*);
727  int8_t* col_buffer_ptr = device_ptr + num_fragments * sizeof(int8_t*);
728  // The code could be shorter w/ one for loop, but the memory access is linear w/ two.
729  for (size_t i = 0; i < num_fragments; ++i) {
730  gpu_allocator_->copyToDevice(device_ptr, &col_buffer_ptr, sizeof(int8_t*));
731  device_ptr += sizeof(int8_t*);
732  col_buffer_ptr += col_bytes;
733  }
734  col_buffer_ptr = device_ptr;
735  for (size_t i = 0; i < num_fragments; ++i) {
736  CHECK_EQ(col_buffers.front().size(), col_buffers[i].size()) << i;
737  gpu_allocator_->copyToDevice(col_buffer_ptr, col_buffers[i].data(), col_bytes);
738  col_buffer_ptr += col_bytes;
739  }
740  }
741 }
742 
743 template <typename T>
745  uint32_t const expected_subvector_size,
746  std::vector<std::vector<T>> const& vec2d) const {
747  return expected_subvector_size * vec2d.size() * sizeof(T);
748 }
749 template <typename T>
751  int8_t* device_ptr,
752  uint32_t const expected_subvector_size,
753  std::vector<std::vector<T>> const& vec2d) const {
754  size_t const bytes_per_subvector = expected_subvector_size * sizeof(T);
755  for (size_t i = 0; i < vec2d.size(); ++i) {
756  CHECK_EQ(expected_subvector_size, vec2d[i].size()) << i << '/' << vec2d.size();
757  gpu_allocator_->copyToDevice(device_ptr, vec2d[i].data(), bytes_per_subvector);
758  device_ptr += bytes_per_subvector;
759  }
760 }
761 
763  bool const is_group_by,
764  std::vector<int64_t> const& init_agg_vals) const {
765  if (is_group_by && !output_columnar_) {
766  auto cmpt_sz = align_to<8>(query_mem_desc_.getColsSize()) / sizeof(int64_t);
767  return cmpt_sz * sizeof(int64_t);
768  } else {
769  return init_agg_vals.size() * sizeof(int64_t);
770  }
771 }
773  int8_t* device_ptr,
774  bool const is_group_by,
775  std::vector<int64_t> const& init_agg_vals) const {
776  if (is_group_by && !output_columnar_) {
777  auto cmpt_sz = align_to<8>(query_mem_desc_.getColsSize()) / sizeof(int64_t);
778  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
779  copyVectorToDevice(device_ptr, cmpt_val_buff);
780  } else if (init_agg_vals.size()) {
781  copyVectorToDevice(device_ptr, init_agg_vals);
782  }
783 }
784 
786  std::vector<int8_t*> const& join_hash_tables) const {
787  return join_hash_tables.size() < 2u ? 0u : join_hash_tables.size() * sizeof(int8_t*);
788 }
790  int8_t* device_ptr,
791  std::vector<int8_t*> const& join_hash_tables) const {
792  switch (join_hash_tables.size()) {
793  case 0u:
794  return nullptr;
795  case 1u:
796  return join_hash_tables[0];
797  default:
798  copyVectorToDevice(device_ptr, join_hash_tables);
799  return device_ptr;
800  }
801 }
802 
804  std::vector<int8_t> const& literal_buff) const {
805  size_t const count_distinct_bytes =
806  query_buffers_->getCountDistinctBitmapDevicePtr() ? 2 * sizeof(int64_t) : 0u;
807  return count_distinct_bytes + literal_buff.size();
808 }
809 // The count_distinct_addresses are considered "additional literals"
810 // and are retrieved as negative offsets relative to the "literals" pointer
811 // via GroupByAndAggregate::getAdditionalLiteral().
813  int8_t* device_ptr,
814  std::vector<int8_t> const& literal_buff) const {
815  // Calculate additional space
816  // * Count Distinct Bitmap
817  int64_t count_distinct_addresses[2];
818  size_t const count_distinct_bytes = query_buffers_->getCountDistinctBitmapDevicePtr()
819  ? sizeof count_distinct_addresses
820  : 0u;
821  CHECK_EQ(0u, uint64_t(device_ptr) % 8);
822  // Copy to device, literals last.
823  if (count_distinct_bytes) {
824  // Store host and device addresses
825  auto const count_distinct_bitmap_host_mem =
826  query_buffers_->getCountDistinctBitmapHostPtr();
827  CHECK(count_distinct_bitmap_host_mem);
828  count_distinct_addresses[0] = // getAdditionalLiteral(-2) in codegenCountDistinct()
829  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem);
830  count_distinct_addresses[1] = // getAdditionalLiteral(-1) in codegenCountDistinct()
831  static_cast<int64_t>(query_buffers_->getCountDistinctBitmapDevicePtr());
832  gpu_allocator_->copyToDevice(
833  device_ptr, count_distinct_addresses, count_distinct_bytes);
834  device_ptr += count_distinct_bytes;
835  }
836  if (!literal_buff.empty()) {
837  gpu_allocator_->copyToDevice(device_ptr, literal_buff.data(), literal_buff.size());
838  }
839  return device_ptr;
840 }
841 
842 template <typename T>
843 void QueryExecutionContext::copyValueToDevice(int8_t* device_ptr, T const value) const {
844  gpu_allocator_->copyToDevice(device_ptr, &value, sizeof(T));
845 }
846 
847 template <typename T>
848 size_t QueryExecutionContext::sizeofVector(std::vector<T> const& vec) const {
849  return vec.size() * sizeof(T);
850 }
851 template <typename T>
853  std::vector<T> const& vec) const {
854  gpu_allocator_->copyToDevice(device_ptr, vec.data(), vec.size() * sizeof(T));
855 }
856 
858  const std::vector<std::vector<const int8_t*>>& col_buffers,
859  const std::vector<int8_t>& literal_buff,
860  const std::vector<std::vector<int64_t>>& num_rows,
861  const std::vector<std::vector<uint64_t>>& frag_offsets,
862  const int32_t scan_limit,
863  const std::vector<int64_t>& init_agg_vals,
864  const std::vector<int32_t>& error_codes,
865  const uint32_t num_tables,
866  const std::vector<int8_t*>& join_hash_tables,
867  Data_Namespace::DataMgr* data_mgr,
868  const int device_id,
869  const bool hoist_literals,
870  const bool is_group_by) const {
872  CHECK(literal_buff.empty() || hoist_literals) << literal_buff.size();
873  CHECK_EQ(num_rows.size(), col_buffers.size());
874  CHECK_EQ(frag_offsets.size(), col_buffers.size());
875 
876  // All sizes are in number of bytes and divisible by 8 (int64_t-aligned).
877  KernelParamSizes param_sizes;
878  param_sizes[ERROR_CODE] = align_to<8>(sizeofVector(error_codes));
879  param_sizes[TOTAL_MATCHED] = align_to<8>(sizeof(uint32_t));
880  param_sizes[GROUPBY_BUF] = 0u;
881  param_sizes[NUM_FRAGMENTS] = align_to<8>(sizeof(uint32_t));
882  param_sizes[NUM_TABLES] = align_to<8>(sizeof(num_tables));
883  param_sizes[ROW_INDEX_RESUME] = align_to<8>(sizeof(uint32_t));
884  param_sizes[COL_BUFFERS] = sizeofColBuffers(col_buffers);
885  param_sizes[LITERALS] = align_to<8>(sizeofLiterals(literal_buff));
886  param_sizes[NUM_ROWS] = sizeofFlattened2dVec(num_tables, num_rows);
887  param_sizes[FRAG_ROW_OFFSETS] = sizeofFlattened2dVec(num_tables, frag_offsets);
888  param_sizes[MAX_MATCHED] = align_to<8>(sizeof(scan_limit));
889  param_sizes[INIT_AGG_VALS] = sizeofInitAggVals(is_group_by, init_agg_vals);
890  param_sizes[JOIN_HASH_TABLES] = sizeofJoinHashTables(join_hash_tables);
891  param_sizes[ROW_FUNC_MGR] = 0u;
892  auto const nbytes = std::accumulate(param_sizes.begin(), param_sizes.end(), size_t(0));
893 
895  // Allocate one block for all kernel params and set pointers based on param_sizes.
896  params[ERROR_CODE] = gpu_allocator_->alloc(nbytes);
897  static_assert(ERROR_CODE == 0);
898  for (size_t i = 1; i < params.size(); ++i) {
899  params[i] = params[i - 1] + param_sizes[i - 1];
900  }
901  // Copy data to device based on params w/ adjustments to LITERALS and JOIN_HASH_TABLES.
902  copyVectorToDevice(params[ERROR_CODE], error_codes);
903  copyValueToDevice(params[TOTAL_MATCHED], int32_t(0));
904  params[GROUPBY_BUF] = nullptr;
905  copyValueToDevice(params[NUM_FRAGMENTS], uint32_t(col_buffers.size()));
906  copyValueToDevice(params[NUM_TABLES], num_tables);
907  copyValueToDevice(params[ROW_INDEX_RESUME], uint32_t(0));
908  copyColBuffersToDevice(params[COL_BUFFERS], col_buffers);
909  params[LITERALS] = copyLiteralsToDevice(params[LITERALS], literal_buff);
910  copyFlattened2dVecToDevice(params[NUM_ROWS], num_tables, num_rows);
911  copyFlattened2dVecToDevice(params[FRAG_ROW_OFFSETS], num_tables, frag_offsets);
912  // Note that this will be overwritten if we are setting the entry count during group by
913  // buffer allocation and initialization
914  copyValueToDevice(params[MAX_MATCHED], scan_limit);
915  copyInitAggValsToDevice(params[INIT_AGG_VALS], is_group_by, init_agg_vals);
916  params[JOIN_HASH_TABLES] =
917  copyJoinHashTablesToDevice(params[JOIN_HASH_TABLES], join_hash_tables);
918 
919  // RowFunctionManager is not supported in GPU. We just keep the argument
920  // to avoid diverging from CPU generated code
921  params[ROW_FUNC_MGR] = nullptr;
922 
923  return params;
924 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
RenderAllocator * getRenderAllocator(size_t device_id)
void call(Ts...args) const
int8_t * copyJoinHashTablesToDevice(int8_t *device_ptr, std::vector< int8_t * > const &join_hash_tables) const
std::unique_ptr< DeviceAllocator > gpu_allocator_
size_t get_num_allocated_rows_from_gpu(DeviceAllocator &device_allocator, int8_t *projection_size_gpu, const int device_id)
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const shared::TableKey &outer_table_key, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc, const int device_id)
const std::string & name() const
Streaming Top N algorithm.
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t start_rowid, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
std::array< int8_t *, KERN_PARAM_COUNT > KernelParams
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:134
void copyColBuffersToDevice(int8_t *device_ptr, std::vector< std::vector< int8_t const * >> const &col_buffers) const
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
const ExecutorDeviceType device_type_
ExecutorDeviceType
std::string to_string(char const *&&v)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
int8_t * copyLiteralsToDevice(int8_t *device_ptr, std::vector< int8_t > const &literal_buff) const
#define INJECT_TIMER(DESC)
Definition: measure.h:96
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
std::list< Analyzer::OrderEntry > order_entries
void copyValueToDevice(int8_t *device_ptr, T const value) const
executor_(executor)
size_t sizeofFlattened2dVec(uint32_t const expected_subvector_size, std::vector< std::vector< T >> const &vec2d) const
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
void copyFlattened2dVecToDevice(int8_t *device_ptr, uint32_t const expected_subvector_size, std::vector< std::vector< T >> const &vec2d) const
int64_t getAggInitValForIndex(const size_t index) const
size_t sizeofInitAggVals(bool const is_group_by, std::vector< int64_t > const &init_agg_vals) const
const std::shared_ptr< Analyzer::Estimator > estimator
void copyVectorToDevice(int8_t *device_ptr, std::vector< T > const &vec) const
size_t sizeofJoinHashTables(std::vector< int8_t * > const &join_hash_tables) const
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
#define UNLIKELY(x)
Definition: likely.h:25
std::optional< size_t > varlenOutputBufferElemSize() const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
std::array< size_t, KERN_PARAM_COUNT > KernelParamSizes
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:33
std::unique_ptr< DeviceKernel > create_device_kernel(const CompilationContext *ctx, int device_id)
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:766
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
KernelParams prepareKernelParams(const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< int8_t > &literal_buff, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, const int32_t scan_limit, const std::vector< int64_t > &init_agg_vals, const std::vector< int32_t > &error_codes, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, Data_Namespace::DataMgr *data_mgr, const int device_id, const bool hoist_literals, const bool is_group_by) const
dictionary params
Definition: report.py:27
bool interleavedBins(const ExecutorDeviceType) const
def error_code
Definition: report.py:244
void copyInitAggValsToDevice(int8_t *device_ptr, bool const is_group_by, std::vector< int64_t > const &init_agg_vals) const
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
int32_t aggregate_error_codes(const std::vector< int32_t > &error_codes)
Basic constructors and methods of the row set interface.
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:88
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
size_t sizeofVector(std::vector< T > const &vec) const
#define VLOG(n)
Definition: Logger.h:388
size_t sizeofColBuffers(std::vector< std::vector< int8_t const * >> const &col_buffers) const
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map, bool optimize_cuda_block_and_grid_sizes)
size_t sizeofLiterals(std::vector< int8_t > const &literal_buff) const