OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "DeviceKernel.h"
21 #include "Execute.h"
22 #include "GpuInitGroups.h"
23 #include "InPlaceSort.h"
25 #include "QueryMemoryInitializer.h"
26 #include "RelAlgExecutionUnit.h"
27 #include "ResultSet.h"
28 #include "Shared/likely.h"
29 #include "SpeculativeTopN.h"
30 #include "StreamingTopN.h"
31 
33  const RelAlgExecutionUnit& ra_exe_unit,
35  const Executor* executor,
36  const ExecutorDeviceType device_type,
37  const ExecutorDispatchMode dispatch_mode,
38  const int device_id,
39  const int outer_table_id,
40  const int64_t num_rows,
41  const std::vector<std::vector<const int8_t*>>& col_buffers,
42  const std::vector<std::vector<uint64_t>>& frag_offsets,
43  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
44  const bool output_columnar,
45  const bool sort_on_gpu,
46  const size_t thread_idx,
47  RenderInfo* render_info)
48  : query_mem_desc_(query_mem_desc)
49  , executor_(executor)
50  , device_type_(device_type)
51  , dispatch_mode_(dispatch_mode)
52  , row_set_mem_owner_(row_set_mem_owner)
53  , output_columnar_(output_columnar) {
54  CHECK(executor);
55  auto data_mgr = executor->getDataMgr();
56  if (device_type == ExecutorDeviceType::GPU) {
57  gpu_allocator_ = std::make_unique<CudaAllocator>(
58  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
59  }
60 
61  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
62  ? render_info->render_allocator_map_ptr.get()
63  : nullptr;
64  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
66  device_id,
67  device_type,
68  dispatch_mode,
69  output_columnar,
71  outer_table_id,
72  num_rows,
73  col_buffers,
74  frag_offsets,
75  render_allocator_map,
76  render_info,
77  row_set_mem_owner,
78  gpu_allocator_.get(),
79  thread_idx,
80  executor);
81 }
82 
84  const size_t i) const {
86  const auto& result_set = query_buffers_->getResultSet(i);
87  auto deinterleaved_query_mem_desc =
89  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
90  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
91 
92  auto deinterleaved_result_set =
93  std::make_shared<ResultSet>(result_set->getTargetInfos(),
94  std::vector<ColumnLazyFetchInfo>{},
95  std::vector<std::vector<const int8_t*>>{},
96  std::vector<std::vector<int64_t>>{},
97  std::vector<int64_t>{},
99  -1,
100  deinterleaved_query_mem_desc,
102  executor_->getCatalog(),
103  executor_->blockSize(),
104  executor_->gridSize());
105  auto deinterleaved_storage =
106  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
107  auto deinterleaved_buffer =
108  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
109  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
110  size_t deinterleaved_buffer_idx = 0;
111  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
112  auto do_work = [&](const size_t bin_base_off) {
113  std::vector<int64_t> agg_vals(agg_col_count, 0);
114  memcpy(&agg_vals[0],
115  &executor_->plan_state_->init_agg_vals_[0],
116  agg_col_count * sizeof(agg_vals[0]));
117  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
118  executor_->warpSize(),
119  false,
120  true,
121  agg_vals,
123  result_set->getTargetInfos(),
124  executor_->plan_state_->init_agg_vals_);
125  for (size_t agg_idx = 0; agg_idx < agg_col_count;
126  ++agg_idx, ++deinterleaved_buffer_idx) {
127  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
128  }
129  };
131  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
132  bin_idx < result_set->entryCount();
133  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
134  if (UNLIKELY((bin_idx & 0xFFFF) == 0 &&
135  executor_->checkNonKernelTimeInterrupted())) {
136  throw std::runtime_error(
137  "Query execution has interrupted during result set reduction");
138  }
139  do_work(bin_base_off);
140  }
141  } else {
142  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
143  bin_idx < result_set->entryCount();
144  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
145  do_work(bin_base_off);
146  }
147  }
148  query_buffers_->resetResultSet(i);
149  return deinterleaved_result_set;
150 }
151 
152 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
154  return query_buffers_->getAggInitValForIndex(index);
155 }
156 
158  const RelAlgExecutionUnit& ra_exe_unit,
159  const QueryMemoryDescriptor& query_mem_desc) const {
160  auto timer = DEBUG_TIMER(__func__);
161  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
163  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
165  const size_t expected_num_buffers = query_mem_desc.hasVarlenOutput() ? 2 : 1;
166  CHECK_EQ(expected_num_buffers, group_by_buffers_size);
167  return groupBufferToResults(0);
168  }
169  const size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
170  const size_t group_by_output_buffers_size =
171  group_by_buffers_size - (query_mem_desc.hasVarlenOutput() ? 1 : 0);
172  for (size_t i = 0; i < group_by_output_buffers_size; i += step) {
173  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
174  }
176  return executor_->reduceMultiDeviceResults(
177  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
178 }
179 
183  }
184  return query_buffers_->getResultSetOwned(i);
185 }
186 
187 namespace {
188 
189 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
190  // Check overflow / division by zero / interrupt first
191  for (const auto err : error_codes) {
192  if (err > 0) {
193  return err;
194  }
195  }
196  for (const auto err : error_codes) {
197  if (err) {
198  return err;
199  }
200  }
201  return 0;
202 }
203 
204 } // namespace
205 
207  const RelAlgExecutionUnit& ra_exe_unit,
208  const CompilationContext* compilation_context,
209  const bool hoist_literals,
210  const std::vector<int8_t>& literal_buff,
211  std::vector<std::vector<const int8_t*>> col_buffers,
212  const std::vector<std::vector<int64_t>>& num_rows,
213  const std::vector<std::vector<uint64_t>>& frag_offsets,
214  const int32_t scan_limit,
215  Data_Namespace::DataMgr* data_mgr,
216  const unsigned block_size_x,
217  const unsigned grid_size_x,
218  const int device_id,
219  const size_t shared_memory_size,
220  int32_t* error_code,
221  const uint32_t num_tables,
222  const bool allow_runtime_interrupt,
223  const std::vector<int8_t*>& join_hash_tables,
224  RenderAllocatorMap* render_allocator_map) {
225  auto timer = DEBUG_TIMER(__func__);
226  INJECT_TIMER(lauchGpuCode);
229  CHECK(compilation_context);
230  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
231 
232  bool is_group_by{query_mem_desc_.isGroupBy()};
233 
234  RenderAllocator* render_allocator = nullptr;
235  if (render_allocator_map) {
236  render_allocator = render_allocator_map->getRenderAllocator(device_id);
237  }
238 
239  auto kernel = create_device_kernel(compilation_context, device_id);
240 
241  std::vector<int64_t*> out_vec;
242  uint32_t num_fragments = col_buffers.size();
243  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
244 
245  auto prepareClock = kernel->make_clock();
246  auto launchClock = kernel->make_clock();
247  auto finishClock = kernel->make_clock();
248 
249  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
250  prepareClock->start();
251  }
252 
254  kernel->initializeDynamicWatchdog(
255  executor_->interrupted_.load(),
257  }
258 
259  if (allow_runtime_interrupt && !render_allocator) {
260  kernel->initializeRuntimeInterrupter(device_id);
261  }
262 
263  auto kernel_params = prepareKernelParams(col_buffers,
264  literal_buff,
265  num_rows,
266  frag_offsets,
267  scan_limit,
268  init_agg_vals,
269  error_codes,
270  num_tables,
271  join_hash_tables,
272  data_mgr,
273  device_id,
274  hoist_literals,
275  is_group_by);
276 
277  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
278  CHECK(!kernel_params[GROUPBY_BUF]);
279 
280  const unsigned block_size_y = 1;
281  const unsigned block_size_z = 1;
282  const unsigned grid_size_y = 1;
283  const unsigned grid_size_z = 1;
284  const auto total_thread_count = block_size_x * grid_size_x;
285  const auto err_desc = kernel_params[ERROR_CODE];
286 
287  if (is_group_by) {
288  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
289  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
290  auto gpu_group_by_buffers =
291  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
293  kernel_params[INIT_AGG_VALS],
294  device_id,
296  block_size_x,
297  grid_size_x,
298  executor_->warpSize(),
299  can_sort_on_gpu,
301  render_allocator);
302  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
303  gpu_allocator_->copyToDevice(
304  kernel_params[MAX_MATCHED], &max_matched, sizeof(max_matched));
305 
306  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.ptrs;
307  std::vector<void*> param_ptrs;
308  for (auto& param : kernel_params) {
309  param_ptrs.push_back(&param);
310  }
311 
312  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
313  auto prepareTime = prepareClock->stop();
314  VLOG(1) << "Device " << std::to_string(device_id)
315  << ": launchGpuCode: group-by prepare: " << std::to_string(prepareTime)
316  << " ms";
317  launchClock->start();
318  }
319 
320  if (hoist_literals) {
321  kernel->launch(grid_size_x,
322  grid_size_y,
323  grid_size_z,
324  block_size_x,
325  block_size_y,
326  block_size_z,
327  shared_memory_size,
328  &param_ptrs[0]);
329  } else {
330  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
331  kernel->launch(grid_size_x,
332  grid_size_y,
333  grid_size_z,
334  block_size_x,
335  block_size_y,
336  block_size_z,
337  shared_memory_size,
338  &param_ptrs[0]);
339  }
340  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
341  auto launchTime = launchClock->stop();
342  VLOG(1) << "Device " << std::to_string(device_id)
343  << ": launchGpuCode: group-by cuLaunchKernel: "
344  << std::to_string(launchTime) << " ms";
345  finishClock->start();
346  }
347 
348  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
349  reinterpret_cast<int8_t*>(err_desc),
350  error_codes.size() * sizeof(error_codes[0]));
351  *error_code = aggregate_error_codes(error_codes);
352  if (*error_code > 0) {
353  return {};
354  }
355 
356  if (!render_allocator) {
358  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
360  gpu_group_by_buffers,
361  ra_exe_unit,
362  total_thread_count,
363  device_id);
364  } else {
365  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
366  try {
369  gpu_group_by_buffers,
370  data_mgr,
371  device_id);
372  } catch (const std::bad_alloc&) {
373  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
374  }
375  }
379  query_buffers_->compactProjectionBuffersGpu(
381  data_mgr,
382  gpu_group_by_buffers,
384  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id),
385  device_id);
386  } else {
387  size_t num_allocated_rows{0};
388  if (ra_exe_unit.use_bump_allocator) {
389  num_allocated_rows = get_num_allocated_rows_from_gpu(
390  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id);
391  // First, check the error code. If we ran out of slots, don't copy data back
392  // into the ResultSet or update ResultSet entry count
393  if (*error_code < 0) {
394  return {};
395  }
396  }
397  query_buffers_->copyGroupByBuffersFromGpu(
400  ra_exe_unit.use_bump_allocator ? num_allocated_rows
402  gpu_group_by_buffers,
403  &ra_exe_unit,
404  block_size_x,
405  grid_size_x,
406  device_id,
407  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
408  if (num_allocated_rows) {
409  CHECK(ra_exe_unit.use_bump_allocator);
410  CHECK(!query_buffers_->result_sets_.empty());
411  query_buffers_->result_sets_.front()->updateStorageEntryCount(
412  num_allocated_rows);
413  }
414  }
415  } else {
416  query_buffers_->copyGroupByBuffersFromGpu(
420  gpu_group_by_buffers,
421  &ra_exe_unit,
422  block_size_x,
423  grid_size_x,
424  device_id,
425  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
426  }
427  }
428  }
429  } else {
430  std::vector<int8_t*> out_vec_dev_buffers;
431  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
432  // by default, non-grouped aggregate queries generate one result per available thread
433  // in the lifetime of (potentially multi-fragment) kernel execution.
434  // We can reduce these intermediate results internally in the device and hence have
435  // only one result per device, if GPU shared memory optimizations are enabled.
436  const auto num_results_per_agg_col =
437  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
438  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
439  if (ra_exe_unit.estimator) {
440  estimator_result_set_.reset(new ResultSet(
441  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
442  out_vec_dev_buffers.push_back(estimator_result_set_->getDeviceEstimatorBuffer());
443  } else {
444  for (size_t i = 0; i < agg_col_count; ++i) {
445  int8_t* out_vec_dev_buffer =
446  num_fragments ? gpu_allocator_->alloc(output_buffer_size_per_agg) : nullptr;
447  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
448  if (shared_memory_size) {
449  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
450  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
451  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
452  output_buffer_size_per_agg);
453  }
454  }
455  }
456  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(int8_t*));
457  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
458  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
459  agg_col_count * sizeof(int8_t*));
460  kernel_params[GROUPBY_BUF] = out_vec_dev_ptr;
461  std::vector<void*> param_ptrs;
462  for (auto& param : kernel_params) {
463  param_ptrs.push_back(&param);
464  }
465 
466  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
467  auto prepareTime = prepareClock->stop();
468 
469  VLOG(1) << "Device " << std::to_string(device_id)
470  << ": launchGpuCode: prepare: " << std::to_string(prepareTime) << " ms";
471  launchClock->start();
472  }
473 
474  if (hoist_literals) {
475  kernel->launch(grid_size_x,
476  grid_size_y,
477  grid_size_z,
478  block_size_x,
479  block_size_y,
480  block_size_z,
481  shared_memory_size,
482  &param_ptrs[0]);
483  } else {
484  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
485  kernel->launch(grid_size_x,
486  grid_size_y,
487  grid_size_z,
488  block_size_x,
489  block_size_y,
490  block_size_z,
491  shared_memory_size,
492  &param_ptrs[0]);
493  }
494 
495  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
496  auto launchTime = launchClock->stop();
497  VLOG(1) << "Device " << std::to_string(device_id)
498  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(launchTime)
499  << " ms";
500  finishClock->start();
501  }
502 
503  gpu_allocator_->copyFromDevice(
504  &error_codes[0], err_desc, error_codes.size() * sizeof(error_codes[0]));
505  *error_code = aggregate_error_codes(error_codes);
506  if (*error_code > 0) {
507  return {};
508  }
509  if (ra_exe_unit.estimator) {
511  estimator_result_set_->syncEstimatorBuffer();
512  return {};
513  }
514  for (size_t i = 0; i < agg_col_count; ++i) {
515  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
516  gpu_allocator_->copyFromDevice(
517  host_out_vec, out_vec_dev_buffers[i], output_buffer_size_per_agg);
518  out_vec.push_back(host_out_vec);
519  }
520  }
521  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
522  if (count_distinct_bitmap_mem) {
523  gpu_allocator_->copyFromDevice(query_buffers_->getCountDistinctHostPtr(),
524  reinterpret_cast<void*>(count_distinct_bitmap_mem),
525  query_buffers_->getCountDistinctBitmapBytes());
526  }
527 
528  const auto varlen_output_gpu_buf = query_buffers_->getVarlenOutputPtr();
529  if (varlen_output_gpu_buf) {
531  const size_t varlen_output_buf_bytes =
534  CHECK(query_buffers_->getVarlenOutputHostPtr());
535  gpu_allocator_->copyFromDevice(query_buffers_->getVarlenOutputHostPtr(),
536  reinterpret_cast<void*>(varlen_output_gpu_buf),
537  varlen_output_buf_bytes);
538  }
539 
540  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
541  if (allow_runtime_interrupt) {
542  kernel->resetRuntimeInterrupter(device_id);
543  }
544  auto finishTime = finishClock->stop();
545  VLOG(1) << "Device " << std::to_string(device_id)
546  << ": launchGpuCode: finish: " << std::to_string(finishTime) << " ms";
547  }
548 
549  return out_vec;
550 }
551 
553  const RelAlgExecutionUnit& ra_exe_unit,
554  const CpuCompilationContext* native_code,
555  const bool hoist_literals,
556  const std::vector<int8_t>& literal_buff,
557  std::vector<std::vector<const int8_t*>> col_buffers,
558  const std::vector<std::vector<int64_t>>& num_rows,
559  const std::vector<std::vector<uint64_t>>& frag_offsets,
560  const int32_t scan_limit,
561  int32_t* error_code,
562  const uint32_t num_tables,
563  const std::vector<int8_t*>& join_hash_tables,
564  const int64_t num_rows_to_process) {
565  auto timer = DEBUG_TIMER(__func__);
566  INJECT_TIMER(lauchCpuCode);
567 
569  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
570 
571  std::vector<const int8_t**> multifrag_col_buffers;
572  for (auto& col_buffer : col_buffers) {
573  multifrag_col_buffers.push_back(col_buffer.empty() ? nullptr : col_buffer.data());
574  }
575  const int8_t*** multifrag_cols_ptr{
576  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
577  const uint64_t num_fragments =
578  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
579  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
580 
581  const bool is_group_by{query_mem_desc_.isGroupBy()};
582  std::vector<int64_t*> out_vec;
583  if (ra_exe_unit.estimator) {
584  // Subfragments collect the result from multiple runs in a single
585  // result set.
586  if (!estimator_result_set_) {
587  estimator_result_set_.reset(
588  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
589  }
590  out_vec.push_back(
591  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
592  } else {
593  if (!is_group_by) {
594  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
595  auto buff = new int64_t[num_out_frags];
596  out_vec.push_back(static_cast<int64_t*>(buff));
597  }
598  }
599  }
600 
601  CHECK_EQ(num_rows.size(), col_buffers.size());
602  std::vector<int64_t> flatened_num_rows;
603  for (auto& nums : num_rows) {
604  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
605  }
606  std::vector<uint64_t> flatened_frag_offsets;
607  for (auto& offsets : frag_offsets) {
608  flatened_frag_offsets.insert(
609  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
610  }
611  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
612  int64_t* num_rows_ptr;
613  if (num_rows_to_process > 0) {
614  flatened_num_rows[0] = num_rows_to_process;
615  num_rows_ptr = flatened_num_rows.data();
616  } else {
617  num_rows_ptr =
618  rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data();
619  }
620  int32_t total_matched_init{0};
621 
622  std::vector<int64_t> cmpt_val_buff;
623  if (is_group_by) {
624  cmpt_val_buff =
626  init_agg_vals,
628  }
629 
630  CHECK(native_code);
631  const int64_t* join_hash_tables_ptr =
632  join_hash_tables.size() == 1
633  ? reinterpret_cast<const int64_t*>(join_hash_tables[0])
634  : (join_hash_tables.size() > 1
635  ? reinterpret_cast<const int64_t*>(&join_hash_tables[0])
636  : nullptr);
637  if (hoist_literals) {
638  using agg_query = void (*)(const int8_t***, // col_buffers
639  const uint64_t*, // num_fragments
640  const int8_t*, // literals
641  const int64_t*, // num_rows
642  const uint64_t*, // frag_row_offsets
643  const int32_t*, // max_matched
644  int32_t*, // total_matched
645  const int64_t*, // init_agg_value
646  int64_t**, // out
647  int32_t*, // error_code
648  const uint32_t*, // num_tables
649  const int64_t*); // join_hash_tables_ptr
650  if (is_group_by) {
651  reinterpret_cast<agg_query>(native_code->func())(
652  multifrag_cols_ptr,
653  &num_fragments,
654  literal_buff.data(),
655  num_rows_ptr,
656  flatened_frag_offsets.data(),
657  &scan_limit,
658  &total_matched_init,
659  cmpt_val_buff.data(),
660  query_buffers_->getGroupByBuffersPtr(),
661  error_code,
662  &num_tables,
663  join_hash_tables_ptr);
664  } else {
665  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
666  &num_fragments,
667  literal_buff.data(),
668  num_rows_ptr,
669  flatened_frag_offsets.data(),
670  &scan_limit,
671  &total_matched_init,
672  init_agg_vals.data(),
673  out_vec.data(),
674  error_code,
675  &num_tables,
676  join_hash_tables_ptr);
677  }
678  } else {
679  using agg_query = void (*)(const int8_t***, // col_buffers
680  const uint64_t*, // num_fragments
681  const int64_t*, // num_rows
682  const uint64_t*, // frag_row_offsets
683  const int32_t*, // max_matched
684  int32_t*, // total_matched
685  const int64_t*, // init_agg_value
686  int64_t**, // out
687  int32_t*, // error_code
688  const uint32_t*, // num_tables
689  const int64_t*); // join_hash_tables_ptr
690  if (is_group_by) {
691  reinterpret_cast<agg_query>(native_code->func())(
692  multifrag_cols_ptr,
693  &num_fragments,
694  num_rows_ptr,
695  flatened_frag_offsets.data(),
696  &scan_limit,
697  &total_matched_init,
698  cmpt_val_buff.data(),
699  query_buffers_->getGroupByBuffersPtr(),
700  error_code,
701  &num_tables,
702  join_hash_tables_ptr);
703  } else {
704  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
705  &num_fragments,
706  num_rows_ptr,
707  flatened_frag_offsets.data(),
708  &scan_limit,
709  &total_matched_init,
710  init_agg_vals.data(),
711  out_vec.data(),
712  error_code,
713  &num_tables,
714  join_hash_tables_ptr);
715  }
716  }
717 
718  if (ra_exe_unit.estimator) {
719  return {};
720  }
721 
722  if (rowid_lookup_num_rows && *error_code < 0) {
723  *error_code = 0;
724  }
725 
727  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
728  }
729 
732  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
733  }
734  return out_vec;
735 }
736 
738  const std::vector<std::vector<const int8_t*>>& col_buffers,
739  const std::vector<int8_t>& literal_buff,
740  const std::vector<std::vector<int64_t>>& num_rows,
741  const std::vector<std::vector<uint64_t>>& frag_offsets,
742  const int32_t scan_limit,
743  const std::vector<int64_t>& init_agg_vals,
744  const std::vector<int32_t>& error_codes,
745  const uint32_t num_tables,
746  const std::vector<int8_t*>& join_hash_tables,
747  Data_Namespace::DataMgr* data_mgr,
748  const int device_id,
749  const bool hoist_literals,
750  const bool is_group_by) const {
752  std::vector<int8_t*> params(KERN_PARAM_COUNT, 0);
753  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
754  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
755  if (col_count) {
756  std::vector<int8_t*> multifrag_col_dev_buffers;
757  for (auto frag_col_buffers : col_buffers) {
758  std::vector<const int8_t*> col_dev_buffers;
759  for (auto col_buffer : frag_col_buffers) {
760  col_dev_buffers.push_back((int8_t*)col_buffer);
761  }
762  auto col_buffers_dev_ptr = gpu_allocator_->alloc(col_count * sizeof(int8_t*));
763  gpu_allocator_->copyToDevice(
764  col_buffers_dev_ptr, &col_dev_buffers[0], col_count * sizeof(int8_t*));
765  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
766  }
767  params[COL_BUFFERS] = gpu_allocator_->alloc(num_fragments * sizeof(int8_t*));
768 
769  gpu_allocator_->copyToDevice(params[COL_BUFFERS],
770  &multifrag_col_dev_buffers[0],
771  num_fragments * sizeof(int8_t*));
772  }
773  params[NUM_FRAGMENTS] = gpu_allocator_->alloc(sizeof(uint64_t));
774  gpu_allocator_->copyToDevice(params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t));
775 
776  int8_t* literals_and_addr_mapping =
777  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t));
778  CHECK_EQ(0, (int64_t)literals_and_addr_mapping % 8);
779  std::vector<int64_t> additional_literal_bytes;
780  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
781  if (count_distinct_bitmap_mem) {
782  // Store host and device addresses
783  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
784  CHECK(count_distinct_bitmap_host_mem);
785  additional_literal_bytes.push_back(
786  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
787  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
788  gpu_allocator_->copyToDevice(
789  literals_and_addr_mapping,
790  &additional_literal_bytes[0],
791  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]));
792  }
793  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
794  sizeof(additional_literal_bytes[0]);
795  if (!literal_buff.empty()) {
796  CHECK(hoist_literals);
797  gpu_allocator_->copyToDevice(params[LITERALS], &literal_buff[0], literal_buff.size());
798  }
799  CHECK_EQ(num_rows.size(), col_buffers.size());
800  std::vector<int64_t> flatened_num_rows;
801  for (auto& nums : num_rows) {
802  CHECK_EQ(nums.size(), num_tables);
803  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
804  }
805  params[NUM_ROWS] = gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size());
806  gpu_allocator_->copyToDevice(params[NUM_ROWS],
807  &flatened_num_rows[0],
808  sizeof(int64_t) * flatened_num_rows.size());
809 
810  CHECK_EQ(frag_offsets.size(), col_buffers.size());
811  std::vector<int64_t> flatened_frag_offsets;
812  for (auto& offsets : frag_offsets) {
813  CHECK_EQ(offsets.size(), num_tables);
814  flatened_frag_offsets.insert(
815  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
816  }
817  params[FRAG_ROW_OFFSETS] =
818  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size());
819  gpu_allocator_->copyToDevice(params[FRAG_ROW_OFFSETS],
820  &flatened_frag_offsets[0],
821  sizeof(int64_t) * flatened_num_rows.size());
822 
823  // Note that this will be overwritten if we are setting the entry count during group by
824  // buffer allocation and initialization
825  int32_t max_matched{scan_limit};
826  params[MAX_MATCHED] = gpu_allocator_->alloc(sizeof(max_matched));
827  gpu_allocator_->copyToDevice(params[MAX_MATCHED], &max_matched, sizeof(max_matched));
828 
829  int32_t total_matched{0};
830  params[TOTAL_MATCHED] = gpu_allocator_->alloc(sizeof(total_matched));
831  gpu_allocator_->copyToDevice(
832  params[TOTAL_MATCHED], &total_matched, sizeof(total_matched));
833 
834  if (is_group_by && !output_columnar_) {
835  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
836  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
837  params[INIT_AGG_VALS] = gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t));
838  gpu_allocator_->copyToDevice(
839  params[INIT_AGG_VALS], &cmpt_val_buff[0], cmpt_sz * sizeof(int64_t));
840  } else {
841  params[INIT_AGG_VALS] = gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t));
842  gpu_allocator_->copyToDevice(
843  params[INIT_AGG_VALS], &init_agg_vals[0], init_agg_vals.size() * sizeof(int64_t));
844  }
845 
846  params[ERROR_CODE] = gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0]));
847  gpu_allocator_->copyToDevice(
848  params[ERROR_CODE], &error_codes[0], error_codes.size() * sizeof(error_codes[0]));
849 
850  params[NUM_TABLES] = gpu_allocator_->alloc(sizeof(uint32_t));
851  gpu_allocator_->copyToDevice(params[NUM_TABLES], &num_tables, sizeof(uint32_t));
852 
853  const auto hash_table_count = join_hash_tables.size();
854  switch (hash_table_count) {
855  case 0: {
856  params[JOIN_HASH_TABLES] = 0;
857  break;
858  }
859  case 1:
860  params[JOIN_HASH_TABLES] = join_hash_tables[0];
861  break;
862  default: {
863  params[JOIN_HASH_TABLES] =
864  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t));
865  gpu_allocator_->copyToDevice(params[JOIN_HASH_TABLES],
866  &join_hash_tables[0],
867  hash_table_count * sizeof(int64_t));
868  break;
869  }
870  }
871 
872  return params;
873 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
RenderAllocator * getRenderAllocator(size_t device_id)
std::unique_ptr< DeviceAllocator > gpu_allocator_
size_t get_num_allocated_rows_from_gpu(DeviceAllocator &device_allocator, int8_t *projection_size_gpu, const int device_id)
ExecutorDeviceType
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc, const int device_id)
Streaming Top N algorithm.
const std::list< Analyzer::OrderEntry > order_entries
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
const ExecutorDeviceType device_type_
std::string to_string(char const *&&v)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
int64_t getAggInitValForIndex(const size_t index) const
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
#define UNLIKELY(x)
Definition: likely.h:25
std::optional< size_t > varlenOutputBufferElemSize() const
std::vector< int8_t * > prepareKernelParams(const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< int8_t > &literal_buff, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, const int32_t scan_limit, const std::vector< int64_t > &init_agg_vals, const std::vector< int32_t > &error_codes, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, Data_Namespace::DataMgr *data_mgr, const int device_id, const bool hoist_literals, const bool is_group_by) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:31
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int outer_table_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
std::unique_ptr< DeviceKernel > create_device_kernel(const CompilationContext *ctx, int device_id)
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:756
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool interleavedBins(const ExecutorDeviceType) const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:68
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int32_t aggregate_error_codes(const std::vector< int32_t > &error_codes)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
Basic constructors and methods of the row set interface.
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:85
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:316