OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryExecutionContext.h"
18 #include "AggregateUtils.h"
20 #include "DeviceKernel.h"
21 #include "Execute.h"
22 #include "GpuInitGroups.h"
23 #include "InPlaceSort.h"
24 #include "QueryMemoryInitializer.h"
25 #include "RelAlgExecutionUnit.h"
26 #include "ResultSet.h"
27 #include "Shared/likely.h"
28 #include "SpeculativeTopN.h"
29 #include "StreamingTopN.h"
30 
32  const RelAlgExecutionUnit& ra_exe_unit,
34  const Executor* executor,
35  const ExecutorDeviceType device_type,
36  const ExecutorDispatchMode dispatch_mode,
37  const int device_id,
38  const int outer_table_id,
39  const int64_t num_rows,
40  const std::vector<std::vector<const int8_t*>>& col_buffers,
41  const std::vector<std::vector<uint64_t>>& frag_offsets,
42  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
43  const bool output_columnar,
44  const bool sort_on_gpu,
45  const size_t thread_idx,
46  RenderInfo* render_info)
47  : query_mem_desc_(query_mem_desc)
48  , executor_(executor)
49  , device_type_(device_type)
50  , dispatch_mode_(dispatch_mode)
51  , row_set_mem_owner_(row_set_mem_owner)
52  , output_columnar_(output_columnar) {
53  CHECK(executor);
54  auto data_mgr = executor->getDataMgr();
55  if (device_type == ExecutorDeviceType::GPU) {
56  gpu_allocator_ = std::make_unique<CudaAllocator>(data_mgr, device_id);
57  }
58 
59  auto render_allocator_map = render_info && render_info->isPotentialInSituRender()
60  ? render_info->render_allocator_map_ptr.get()
61  : nullptr;
62  query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
64  device_id,
65  device_type,
66  dispatch_mode,
67  output_columnar,
69  outer_table_id,
70  num_rows,
71  col_buffers,
72  frag_offsets,
73  render_allocator_map,
74  render_info,
75  row_set_mem_owner,
76  gpu_allocator_.get(),
77  thread_idx,
78  executor);
79 }
80 
82  const size_t i) const {
84  const auto& result_set = query_buffers_->getResultSet(i);
85  auto deinterleaved_query_mem_desc =
87  deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(false);
88  deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
89 
90  auto deinterleaved_result_set =
91  std::make_shared<ResultSet>(result_set->getTargetInfos(),
92  std::vector<ColumnLazyFetchInfo>{},
93  std::vector<std::vector<const int8_t*>>{},
94  std::vector<std::vector<int64_t>>{},
95  std::vector<int64_t>{},
97  -1,
98  deinterleaved_query_mem_desc,
100  executor_->getCatalog(),
101  executor_->blockSize(),
102  executor_->gridSize());
103  auto deinterleaved_storage =
104  deinterleaved_result_set->allocateStorage(executor_->plan_state_->init_agg_vals_);
105  auto deinterleaved_buffer =
106  reinterpret_cast<int64_t*>(deinterleaved_storage->getUnderlyingBuffer());
107  const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
108  size_t deinterleaved_buffer_idx = 0;
109  const size_t agg_col_count{query_mem_desc_.getSlotCount()};
110  auto do_work = [&](const size_t bin_base_off) {
111  std::vector<int64_t> agg_vals(agg_col_count, 0);
112  memcpy(&agg_vals[0],
113  &executor_->plan_state_->init_agg_vals_[0],
114  agg_col_count * sizeof(agg_vals[0]));
115  ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
116  executor_->warpSize(),
117  false,
118  true,
119  agg_vals,
121  result_set->getTargetInfos(),
122  executor_->plan_state_->init_agg_vals_);
123  for (size_t agg_idx = 0; agg_idx < agg_col_count;
124  ++agg_idx, ++deinterleaved_buffer_idx) {
125  deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
126  }
127  };
129  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
130  bin_idx < result_set->entryCount();
131  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
132  if (UNLIKELY((bin_idx & 0xFFFF) == 0 &&
133  executor_->checkNonKernelTimeInterrupted())) {
134  throw std::runtime_error(
135  "Query execution has interrupted during result set reduction");
136  }
137  do_work(bin_base_off);
138  }
139  } else {
140  for (size_t bin_base_off = query_mem_desc_.getColOffInBytes(0), bin_idx = 0;
141  bin_idx < result_set->entryCount();
142  ++bin_idx, bin_base_off += query_mem_desc_.getColOffInBytesInNextBin(0)) {
143  do_work(bin_base_off);
144  }
145  }
146  query_buffers_->resetResultSet(i);
147  return deinterleaved_result_set;
148 }
149 
150 int64_t QueryExecutionContext::getAggInitValForIndex(const size_t index) const {
152  return query_buffers_->getAggInitValForIndex(index);
153 }
154 
156  const RelAlgExecutionUnit& ra_exe_unit,
157  const QueryMemoryDescriptor& query_mem_desc) const {
158  auto timer = DEBUG_TIMER(__func__);
159  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
161  const auto group_by_buffers_size = query_buffers_->getNumBuffers();
163  const size_t expected_num_buffers = query_mem_desc.hasVarlenOutput() ? 2 : 1;
164  CHECK_EQ(expected_num_buffers, group_by_buffers_size);
165  return groupBufferToResults(0);
166  }
167  const size_t step{query_mem_desc_.threadsShareMemory() ? executor_->blockSize() : 1};
168  const size_t group_by_output_buffers_size =
169  group_by_buffers_size - (query_mem_desc.hasVarlenOutput() ? 1 : 0);
170  for (size_t i = 0; i < group_by_output_buffers_size; i += step) {
171  results_per_sm.emplace_back(groupBufferToResults(i), std::vector<size_t>{});
172  }
174  return executor_->reduceMultiDeviceResults(
175  ra_exe_unit, results_per_sm, row_set_mem_owner_, query_mem_desc);
176 }
177 
181  }
182  return query_buffers_->getResultSetOwned(i);
183 }
184 
185 namespace {
186 
187 int32_t aggregate_error_codes(const std::vector<int32_t>& error_codes) {
188  // Check overflow / division by zero / interrupt first
189  for (const auto err : error_codes) {
190  if (err > 0) {
191  return err;
192  }
193  }
194  for (const auto err : error_codes) {
195  if (err) {
196  return err;
197  }
198  }
199  return 0;
200 }
201 
202 } // namespace
203 
205  const RelAlgExecutionUnit& ra_exe_unit,
206  const CompilationContext* compilation_context,
207  const bool hoist_literals,
208  const std::vector<int8_t>& literal_buff,
209  std::vector<std::vector<const int8_t*>> col_buffers,
210  const std::vector<std::vector<int64_t>>& num_rows,
211  const std::vector<std::vector<uint64_t>>& frag_offsets,
212  const int32_t scan_limit,
213  Data_Namespace::DataMgr* data_mgr,
214  const unsigned block_size_x,
215  const unsigned grid_size_x,
216  const int device_id,
217  const size_t shared_memory_size,
218  int32_t* error_code,
219  const uint32_t num_tables,
220  const bool allow_runtime_interrupt,
221  const std::vector<int8_t*>& join_hash_tables,
222  RenderAllocatorMap* render_allocator_map) {
223  auto timer = DEBUG_TIMER(__func__);
224  INJECT_TIMER(lauchGpuCode);
227  CHECK(compilation_context);
228  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
229 
230  bool is_group_by{query_mem_desc_.isGroupBy()};
231 
232  RenderAllocator* render_allocator = nullptr;
233  if (render_allocator_map) {
234  render_allocator = render_allocator_map->getRenderAllocator(device_id);
235  }
236 
237  auto kernel = create_device_kernel(compilation_context, device_id);
238 
239  std::vector<int64_t*> out_vec;
240  uint32_t num_fragments = col_buffers.size();
241  std::vector<int32_t> error_codes(grid_size_x * block_size_x);
242 
243  auto prepareClock = kernel->make_clock();
244  auto launchClock = kernel->make_clock();
245  auto finishClock = kernel->make_clock();
246 
247  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
248  prepareClock->start();
249  }
250 
252  kernel->initializeDynamicWatchdog(
253  executor_->interrupted_.load(),
255  }
256 
257  if (allow_runtime_interrupt && !render_allocator) {
258  kernel->initializeRuntimeInterrupter();
259  }
260 
261  auto kernel_params = prepareKernelParams(col_buffers,
262  literal_buff,
263  num_rows,
264  frag_offsets,
265  scan_limit,
266  init_agg_vals,
267  error_codes,
268  num_tables,
269  join_hash_tables,
270  data_mgr,
271  device_id,
272  hoist_literals,
273  is_group_by);
274 
275  CHECK_EQ(static_cast<size_t>(KERN_PARAM_COUNT), kernel_params.size());
276  CHECK(!kernel_params[GROUPBY_BUF]);
277 
278  const unsigned block_size_y = 1;
279  const unsigned block_size_z = 1;
280  const unsigned grid_size_y = 1;
281  const unsigned grid_size_z = 1;
282  const auto total_thread_count = block_size_x * grid_size_x;
283  const auto err_desc = kernel_params[ERROR_CODE];
284 
285  if (is_group_by) {
286  CHECK(!(query_buffers_->getGroupByBuffersSize() == 0) || render_allocator);
287  bool can_sort_on_gpu = query_mem_desc_.sortOnGpu();
288  auto gpu_group_by_buffers =
289  query_buffers_->createAndInitializeGroupByBufferGpu(ra_exe_unit,
291  kernel_params[INIT_AGG_VALS],
292  device_id,
294  block_size_x,
295  grid_size_x,
296  executor_->warpSize(),
297  can_sort_on_gpu,
299  render_allocator);
300  const auto max_matched = static_cast<int32_t>(gpu_group_by_buffers.entry_count);
301  gpu_allocator_->copyToDevice(
302  kernel_params[MAX_MATCHED], &max_matched, sizeof(max_matched));
303 
304  kernel_params[GROUPBY_BUF] = gpu_group_by_buffers.ptrs;
305  std::vector<void*> param_ptrs;
306  for (auto& param : kernel_params) {
307  param_ptrs.push_back(&param);
308  }
309 
310  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
311  auto prepareTime = prepareClock->stop();
312  VLOG(1) << "Device " << std::to_string(device_id)
313  << ": launchGpuCode: group-by prepare: " << std::to_string(prepareTime)
314  << " ms";
315  launchClock->start();
316  }
317 
318  if (hoist_literals) {
319  kernel->launch(grid_size_x,
320  grid_size_y,
321  grid_size_z,
322  block_size_x,
323  block_size_y,
324  block_size_z,
325  shared_memory_size,
326  &param_ptrs[0]);
327  } else {
328  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
329  kernel->launch(grid_size_x,
330  grid_size_y,
331  grid_size_z,
332  block_size_x,
333  block_size_y,
334  block_size_z,
335  shared_memory_size,
336  &param_ptrs[0]);
337  }
338  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
339  auto launchTime = launchClock->stop();
340  VLOG(1) << "Device " << std::to_string(device_id)
341  << ": launchGpuCode: group-by cuLaunchKernel: "
342  << std::to_string(launchTime) << " ms";
343  finishClock->start();
344  }
345 
346  gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
347  reinterpret_cast<int8_t*>(err_desc),
348  error_codes.size() * sizeof(error_codes[0]));
349  *error_code = aggregate_error_codes(error_codes);
350  if (*error_code > 0) {
351  return {};
352  }
353 
354  if (!render_allocator) {
356  query_buffers_->applyStreamingTopNOffsetGpu(data_mgr,
358  gpu_group_by_buffers,
359  ra_exe_unit,
360  total_thread_count,
361  device_id);
362  } else {
363  if (use_speculative_top_n(ra_exe_unit, query_mem_desc_)) {
364  try {
367  gpu_group_by_buffers,
368  data_mgr,
369  device_id);
370  } catch (const std::bad_alloc&) {
371  throw SpeculativeTopNFailed("Failed during in-place GPU sort.");
372  }
373  }
377  query_buffers_->compactProjectionBuffersGpu(
379  data_mgr,
380  gpu_group_by_buffers,
382  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id),
383  device_id);
384  } else {
385  size_t num_allocated_rows{0};
386  if (ra_exe_unit.use_bump_allocator) {
387  num_allocated_rows = get_num_allocated_rows_from_gpu(
388  *gpu_allocator_, kernel_params[TOTAL_MATCHED], device_id);
389  // First, check the error code. If we ran out of slots, don't copy data back
390  // into the ResultSet or update ResultSet entry count
391  if (*error_code < 0) {
392  return {};
393  }
394  }
395  query_buffers_->copyGroupByBuffersFromGpu(
398  ra_exe_unit.use_bump_allocator ? num_allocated_rows
400  gpu_group_by_buffers,
401  &ra_exe_unit,
402  block_size_x,
403  grid_size_x,
404  device_id,
405  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
406  if (num_allocated_rows) {
407  CHECK(ra_exe_unit.use_bump_allocator);
408  CHECK(!query_buffers_->result_sets_.empty());
409  query_buffers_->result_sets_.front()->updateStorageEntryCount(
410  num_allocated_rows);
411  }
412  }
413  } else {
414  query_buffers_->copyGroupByBuffersFromGpu(
418  gpu_group_by_buffers,
419  &ra_exe_unit,
420  block_size_x,
421  grid_size_x,
422  device_id,
423  can_sort_on_gpu && query_mem_desc_.hasKeylessHash());
424  }
425  }
426  }
427  } else {
428  std::vector<int8_t*> out_vec_dev_buffers;
429  const size_t agg_col_count{ra_exe_unit.estimator ? size_t(1) : init_agg_vals.size()};
430  // by default, non-grouped aggregate queries generate one result per available thread
431  // in the lifetime of (potentially multi-fragment) kernel execution.
432  // We can reduce these intermediate results internally in the device and hence have
433  // only one result per device, if GPU shared memory optimizations are enabled.
434  const auto num_results_per_agg_col =
435  shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
436  const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
437  if (ra_exe_unit.estimator) {
438  estimator_result_set_.reset(new ResultSet(
439  ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
440  out_vec_dev_buffers.push_back(estimator_result_set_->getDeviceEstimatorBuffer());
441  } else {
442  for (size_t i = 0; i < agg_col_count; ++i) {
443  int8_t* out_vec_dev_buffer =
444  num_fragments ? gpu_allocator_->alloc(output_buffer_size_per_agg) : nullptr;
445  out_vec_dev_buffers.push_back(out_vec_dev_buffer);
446  if (shared_memory_size) {
447  CHECK_EQ(output_buffer_size_per_agg, size_t(8));
448  gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
449  reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
450  output_buffer_size_per_agg);
451  }
452  }
453  }
454  auto out_vec_dev_ptr = gpu_allocator_->alloc(agg_col_count * sizeof(int8_t*));
455  gpu_allocator_->copyToDevice(out_vec_dev_ptr,
456  reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
457  agg_col_count * sizeof(int8_t*));
458  kernel_params[GROUPBY_BUF] = out_vec_dev_ptr;
459  std::vector<void*> param_ptrs;
460  for (auto& param : kernel_params) {
461  param_ptrs.push_back(&param);
462  }
463 
464  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
465  auto prepareTime = prepareClock->stop();
466 
467  VLOG(1) << "Device " << std::to_string(device_id)
468  << ": launchGpuCode: prepare: " << std::to_string(prepareTime) << " ms";
469  launchClock->start();
470  }
471 
472  if (hoist_literals) {
473  kernel->launch(grid_size_x,
474  grid_size_y,
475  grid_size_z,
476  block_size_x,
477  block_size_y,
478  block_size_z,
479  shared_memory_size,
480  &param_ptrs[0]);
481  } else {
482  param_ptrs.erase(param_ptrs.begin() + LITERALS); // TODO(alex): remove
483  kernel->launch(grid_size_x,
484  grid_size_y,
485  grid_size_z,
486  block_size_x,
487  block_size_y,
488  block_size_z,
489  shared_memory_size,
490  &param_ptrs[0]);
491  }
492 
493  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
494  auto launchTime = launchClock->stop();
495  VLOG(1) << "Device " << std::to_string(device_id)
496  << ": launchGpuCode: cuLaunchKernel: " << std::to_string(launchTime)
497  << " ms";
498  finishClock->start();
499  }
500 
501  gpu_allocator_->copyFromDevice(
502  &error_codes[0], err_desc, error_codes.size() * sizeof(error_codes[0]));
503  *error_code = aggregate_error_codes(error_codes);
504  if (*error_code > 0) {
505  return {};
506  }
507  if (ra_exe_unit.estimator) {
509  estimator_result_set_->syncEstimatorBuffer();
510  return {};
511  }
512  for (size_t i = 0; i < agg_col_count; ++i) {
513  int64_t* host_out_vec = new int64_t[output_buffer_size_per_agg];
514  gpu_allocator_->copyFromDevice(
515  host_out_vec, out_vec_dev_buffers[i], output_buffer_size_per_agg);
516  out_vec.push_back(host_out_vec);
517  }
518  }
519  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
520  if (count_distinct_bitmap_mem) {
521  gpu_allocator_->copyFromDevice(query_buffers_->getCountDistinctHostPtr(),
522  reinterpret_cast<void*>(count_distinct_bitmap_mem),
523  query_buffers_->getCountDistinctBitmapBytes());
524  }
525 
526  const auto varlen_output_gpu_buf = query_buffers_->getVarlenOutputPtr();
527  if (varlen_output_gpu_buf) {
529  const size_t varlen_output_buf_bytes =
532  CHECK(query_buffers_->getVarlenOutputHostPtr());
533  gpu_allocator_->copyFromDevice(query_buffers_->getVarlenOutputHostPtr(),
534  reinterpret_cast<void*>(varlen_output_gpu_buf),
535  varlen_output_buf_bytes);
536  }
537 
538  if (g_enable_dynamic_watchdog || (allow_runtime_interrupt && !render_allocator)) {
539  auto finishTime = finishClock->stop();
540  VLOG(1) << "Device " << std::to_string(device_id)
541  << ": launchGpuCode: finish: " << std::to_string(finishTime) << " ms";
542  }
543 
544  return out_vec;
545 }
546 
548  const RelAlgExecutionUnit& ra_exe_unit,
549  const CpuCompilationContext* native_code,
550  const bool hoist_literals,
551  const std::vector<int8_t>& literal_buff,
552  std::vector<std::vector<const int8_t*>> col_buffers,
553  const std::vector<std::vector<int64_t>>& num_rows,
554  const std::vector<std::vector<uint64_t>>& frag_offsets,
555  const int32_t scan_limit,
556  int32_t* error_code,
557  const uint32_t num_tables,
558  const std::vector<int8_t*>& join_hash_tables,
559  const int64_t num_rows_to_process) {
560  auto timer = DEBUG_TIMER(__func__);
561  INJECT_TIMER(lauchCpuCode);
562 
564  const auto& init_agg_vals = query_buffers_->init_agg_vals_;
565 
566  std::vector<const int8_t**> multifrag_col_buffers;
567  for (auto& col_buffer : col_buffers) {
568  multifrag_col_buffers.push_back(col_buffer.empty() ? nullptr : col_buffer.data());
569  }
570  const int8_t*** multifrag_cols_ptr{
571  multifrag_col_buffers.empty() ? nullptr : &multifrag_col_buffers[0]};
572  const uint64_t num_fragments =
573  multifrag_cols_ptr ? static_cast<uint64_t>(col_buffers.size()) : uint64_t(0);
574  const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
575 
576  const bool is_group_by{query_mem_desc_.isGroupBy()};
577  std::vector<int64_t*> out_vec;
578  if (ra_exe_unit.estimator) {
579  // Subfragments collect the result from multiple runs in a single
580  // result set.
581  if (!estimator_result_set_) {
582  estimator_result_set_.reset(
583  new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
584  }
585  out_vec.push_back(
586  reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
587  } else {
588  if (!is_group_by) {
589  for (size_t i = 0; i < init_agg_vals.size(); ++i) {
590  auto buff = new int64_t[num_out_frags];
591  out_vec.push_back(static_cast<int64_t*>(buff));
592  }
593  }
594  }
595 
596  CHECK_EQ(num_rows.size(), col_buffers.size());
597  std::vector<int64_t> flatened_num_rows;
598  for (auto& nums : num_rows) {
599  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
600  }
601  std::vector<uint64_t> flatened_frag_offsets;
602  for (auto& offsets : frag_offsets) {
603  flatened_frag_offsets.insert(
604  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
605  }
606  int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
607  int64_t* num_rows_ptr;
608  if (num_rows_to_process > 0) {
609  flatened_num_rows[0] = num_rows_to_process;
610  num_rows_ptr = flatened_num_rows.data();
611  } else {
612  num_rows_ptr =
613  rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data();
614  }
615  int32_t total_matched_init{0};
616 
617  std::vector<int64_t> cmpt_val_buff;
618  if (is_group_by) {
619  cmpt_val_buff =
621  init_agg_vals,
623  }
624 
625  CHECK(native_code);
626  const int64_t* join_hash_tables_ptr =
627  join_hash_tables.size() == 1
628  ? reinterpret_cast<const int64_t*>(join_hash_tables[0])
629  : (join_hash_tables.size() > 1
630  ? reinterpret_cast<const int64_t*>(&join_hash_tables[0])
631  : nullptr);
632  if (hoist_literals) {
633  using agg_query = void (*)(const int8_t***, // col_buffers
634  const uint64_t*, // num_fragments
635  const int8_t*, // literals
636  const int64_t*, // num_rows
637  const uint64_t*, // frag_row_offsets
638  const int32_t*, // max_matched
639  int32_t*, // total_matched
640  const int64_t*, // init_agg_value
641  int64_t**, // out
642  int32_t*, // error_code
643  const uint32_t*, // num_tables
644  const int64_t*); // join_hash_tables_ptr
645  if (is_group_by) {
646  reinterpret_cast<agg_query>(native_code->func())(
647  multifrag_cols_ptr,
648  &num_fragments,
649  literal_buff.data(),
650  num_rows_ptr,
651  flatened_frag_offsets.data(),
652  &scan_limit,
653  &total_matched_init,
654  cmpt_val_buff.data(),
655  query_buffers_->getGroupByBuffersPtr(),
656  error_code,
657  &num_tables,
658  join_hash_tables_ptr);
659  } else {
660  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
661  &num_fragments,
662  literal_buff.data(),
663  num_rows_ptr,
664  flatened_frag_offsets.data(),
665  &scan_limit,
666  &total_matched_init,
667  init_agg_vals.data(),
668  out_vec.data(),
669  error_code,
670  &num_tables,
671  join_hash_tables_ptr);
672  }
673  } else {
674  using agg_query = void (*)(const int8_t***, // col_buffers
675  const uint64_t*, // num_fragments
676  const int64_t*, // num_rows
677  const uint64_t*, // frag_row_offsets
678  const int32_t*, // max_matched
679  int32_t*, // total_matched
680  const int64_t*, // init_agg_value
681  int64_t**, // out
682  int32_t*, // error_code
683  const uint32_t*, // num_tables
684  const int64_t*); // join_hash_tables_ptr
685  if (is_group_by) {
686  reinterpret_cast<agg_query>(native_code->func())(
687  multifrag_cols_ptr,
688  &num_fragments,
689  num_rows_ptr,
690  flatened_frag_offsets.data(),
691  &scan_limit,
692  &total_matched_init,
693  cmpt_val_buff.data(),
694  query_buffers_->getGroupByBuffersPtr(),
695  error_code,
696  &num_tables,
697  join_hash_tables_ptr);
698  } else {
699  reinterpret_cast<agg_query>(native_code->func())(multifrag_cols_ptr,
700  &num_fragments,
701  num_rows_ptr,
702  flatened_frag_offsets.data(),
703  &scan_limit,
704  &total_matched_init,
705  init_agg_vals.data(),
706  out_vec.data(),
707  error_code,
708  &num_tables,
709  join_hash_tables_ptr);
710  }
711  }
712 
713  if (ra_exe_unit.estimator) {
714  return {};
715  }
716 
717  if (rowid_lookup_num_rows && *error_code < 0) {
718  *error_code = 0;
719  }
720 
722  query_buffers_->applyStreamingTopNOffsetCpu(query_mem_desc_, ra_exe_unit);
723  }
724 
727  query_buffers_->compactProjectionBuffersCpu(query_mem_desc_, total_matched_init);
728  }
729  return out_vec;
730 }
731 
733  const std::vector<std::vector<const int8_t*>>& col_buffers,
734  const std::vector<int8_t>& literal_buff,
735  const std::vector<std::vector<int64_t>>& num_rows,
736  const std::vector<std::vector<uint64_t>>& frag_offsets,
737  const int32_t scan_limit,
738  const std::vector<int64_t>& init_agg_vals,
739  const std::vector<int32_t>& error_codes,
740  const uint32_t num_tables,
741  const std::vector<int8_t*>& join_hash_tables,
742  Data_Namespace::DataMgr* data_mgr,
743  const int device_id,
744  const bool hoist_literals,
745  const bool is_group_by) const {
747  std::vector<int8_t*> params(KERN_PARAM_COUNT, 0);
748  const uint64_t num_fragments = static_cast<uint64_t>(col_buffers.size());
749  const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
750  if (col_count) {
751  std::vector<int8_t*> multifrag_col_dev_buffers;
752  for (auto frag_col_buffers : col_buffers) {
753  std::vector<const int8_t*> col_dev_buffers;
754  for (auto col_buffer : frag_col_buffers) {
755  col_dev_buffers.push_back((int8_t*)col_buffer);
756  }
757  auto col_buffers_dev_ptr = gpu_allocator_->alloc(col_count * sizeof(int8_t*));
758  gpu_allocator_->copyToDevice(
759  col_buffers_dev_ptr, &col_dev_buffers[0], col_count * sizeof(int8_t*));
760  multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
761  }
762  params[COL_BUFFERS] = gpu_allocator_->alloc(num_fragments * sizeof(int8_t*));
763 
764  gpu_allocator_->copyToDevice(params[COL_BUFFERS],
765  &multifrag_col_dev_buffers[0],
766  num_fragments * sizeof(int8_t*));
767  }
768  params[NUM_FRAGMENTS] = gpu_allocator_->alloc(sizeof(uint64_t));
769  gpu_allocator_->copyToDevice(params[NUM_FRAGMENTS], &num_fragments, sizeof(uint64_t));
770 
771  int8_t* literals_and_addr_mapping =
772  gpu_allocator_->alloc(literal_buff.size() + 2 * sizeof(int64_t));
773  CHECK_EQ(0, (int64_t)literals_and_addr_mapping % 8);
774  std::vector<int64_t> additional_literal_bytes;
775  const auto count_distinct_bitmap_mem = query_buffers_->getCountDistinctBitmapPtr();
776  if (count_distinct_bitmap_mem) {
777  // Store host and device addresses
778  const auto count_distinct_bitmap_host_mem = query_buffers_->getCountDistinctHostPtr();
779  CHECK(count_distinct_bitmap_host_mem);
780  additional_literal_bytes.push_back(
781  reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
782  additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
783  gpu_allocator_->copyToDevice(
784  literals_and_addr_mapping,
785  &additional_literal_bytes[0],
786  additional_literal_bytes.size() * sizeof(additional_literal_bytes[0]));
787  }
788  params[LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
789  sizeof(additional_literal_bytes[0]);
790  if (!literal_buff.empty()) {
791  CHECK(hoist_literals);
792  gpu_allocator_->copyToDevice(params[LITERALS], &literal_buff[0], literal_buff.size());
793  }
794  CHECK_EQ(num_rows.size(), col_buffers.size());
795  std::vector<int64_t> flatened_num_rows;
796  for (auto& nums : num_rows) {
797  CHECK_EQ(nums.size(), num_tables);
798  flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
799  }
800  params[NUM_ROWS] = gpu_allocator_->alloc(sizeof(int64_t) * flatened_num_rows.size());
801  gpu_allocator_->copyToDevice(params[NUM_ROWS],
802  &flatened_num_rows[0],
803  sizeof(int64_t) * flatened_num_rows.size());
804 
805  CHECK_EQ(frag_offsets.size(), col_buffers.size());
806  std::vector<int64_t> flatened_frag_offsets;
807  for (auto& offsets : frag_offsets) {
808  CHECK_EQ(offsets.size(), num_tables);
809  flatened_frag_offsets.insert(
810  flatened_frag_offsets.end(), offsets.begin(), offsets.end());
811  }
812  params[FRAG_ROW_OFFSETS] =
813  gpu_allocator_->alloc(sizeof(int64_t) * flatened_frag_offsets.size());
814  gpu_allocator_->copyToDevice(params[FRAG_ROW_OFFSETS],
815  &flatened_frag_offsets[0],
816  sizeof(int64_t) * flatened_num_rows.size());
817 
818  // Note that this will be overwritten if we are setting the entry count during group by
819  // buffer allocation and initialization
820  int32_t max_matched{scan_limit};
821  params[MAX_MATCHED] = gpu_allocator_->alloc(sizeof(max_matched));
822  gpu_allocator_->copyToDevice(params[MAX_MATCHED], &max_matched, sizeof(max_matched));
823 
824  int32_t total_matched{0};
825  params[TOTAL_MATCHED] = gpu_allocator_->alloc(sizeof(total_matched));
826  gpu_allocator_->copyToDevice(
827  params[TOTAL_MATCHED], &total_matched, sizeof(total_matched));
828 
829  if (is_group_by && !output_columnar_) {
830  auto cmpt_sz = align_to_int64(query_mem_desc_.getColsSize()) / sizeof(int64_t);
831  auto cmpt_val_buff = compact_init_vals(cmpt_sz, init_agg_vals, query_mem_desc_);
832  params[INIT_AGG_VALS] = gpu_allocator_->alloc(cmpt_sz * sizeof(int64_t));
833  gpu_allocator_->copyToDevice(
834  params[INIT_AGG_VALS], &cmpt_val_buff[0], cmpt_sz * sizeof(int64_t));
835  } else {
836  params[INIT_AGG_VALS] = gpu_allocator_->alloc(init_agg_vals.size() * sizeof(int64_t));
837  gpu_allocator_->copyToDevice(
838  params[INIT_AGG_VALS], &init_agg_vals[0], init_agg_vals.size() * sizeof(int64_t));
839  }
840 
841  params[ERROR_CODE] = gpu_allocator_->alloc(error_codes.size() * sizeof(error_codes[0]));
842  gpu_allocator_->copyToDevice(
843  params[ERROR_CODE], &error_codes[0], error_codes.size() * sizeof(error_codes[0]));
844 
845  params[NUM_TABLES] = gpu_allocator_->alloc(sizeof(uint32_t));
846  gpu_allocator_->copyToDevice(params[NUM_TABLES], &num_tables, sizeof(uint32_t));
847 
848  const auto hash_table_count = join_hash_tables.size();
849  switch (hash_table_count) {
850  case 0: {
851  params[JOIN_HASH_TABLES] = 0;
852  break;
853  }
854  case 1:
855  params[JOIN_HASH_TABLES] = join_hash_tables[0];
856  break;
857  default: {
858  params[JOIN_HASH_TABLES] =
859  gpu_allocator_->alloc(hash_table_count * sizeof(int64_t));
860  gpu_allocator_->copyToDevice(params[JOIN_HASH_TABLES],
861  &join_hash_tables[0],
862  hash_table_count * sizeof(int64_t));
863  break;
864  }
865  }
866 
867  return params;
868 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
RenderAllocator * getRenderAllocator(size_t device_id)
std::unique_ptr< DeviceAllocator > gpu_allocator_
size_t get_num_allocated_rows_from_gpu(DeviceAllocator &device_allocator, int8_t *projection_size_gpu, const int device_id)
ExecutorDeviceType
Streaming Top N algorithm.
const std::list< Analyzer::OrderEntry > order_entries
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:78
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:121
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
const ExecutorDeviceType device_type_
std::string to_string(char const *&&v)
ExecutorDispatchMode
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
int64_t getAggInitValForIndex(const size_t index) const
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
#define UNLIKELY(x)
Definition: likely.h:25
std::optional< size_t > varlenOutputBufferElemSize() const
std::vector< int8_t * > prepareKernelParams(const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< int8_t > &literal_buff, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, const int32_t scan_limit, const std::vector< int64_t > &init_agg_vals, const std::vector< int32_t > &error_codes, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, Data_Namespace::DataMgr *data_mgr, const int device_id, const bool hoist_literals, const bool is_group_by) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
Definition: RenderInfo.h:31
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int outer_table_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
std::unique_ptr< DeviceKernel > create_device_kernel(const CompilationContext *ctx, int device_id)
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:642
bool interleavedBins(const ExecutorDeviceType) const
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:63
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
int32_t aggregate_error_codes(const std::vector< int32_t > &error_codes)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map)
Basic constructors and methods of the row set interface.
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc)
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
Definition: Execute.cpp:82
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:305