OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext Class Reference

#include <TableFunctionExecutionContext.h>

Public Member Functions

 TableFunctionExecutionContext (std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
 TableFunctionExecutionContext (const TableFunctionExecutionContext &)=delete
 
TableFunctionExecutionContextoperator= (const TableFunctionExecutionContext &)=delete
 
ResultSetPtr execute (const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
 

Private Member Functions

void launchPreCodeOnCpu (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
 
ResultSetPtr launchCpuCode (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
 
ResultSetPtr launchGpuCode (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
 

Private Attributes

std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 

Detailed Description

Definition at line 27 of file TableFunctionExecutionContext.h.

Constructor & Destructor Documentation

TableFunctionExecutionContext::TableFunctionExecutionContext ( std::shared_ptr< RowSetMemoryOwner row_set_mem_owner)
inline

Definition at line 29 of file TableFunctionExecutionContext.h.

30  : row_set_mem_owner_(row_set_mem_owner) {}
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
TableFunctionExecutionContext::TableFunctionExecutionContext ( const TableFunctionExecutionContext )
delete

Member Function Documentation

ResultSetPtr TableFunctionExecutionContext::execute ( const TableFunctionExecutionUnit exe_unit,
const std::vector< InputTableInfo > &  table_infos,
const std::shared_ptr< CompilationContext > &  compilation_context,
const ColumnFetcher column_fetcher,
const ExecutorDeviceType  device_type,
Executor executor,
bool  is_pre_launch_udtf 
)

Definition at line 119 of file TableFunctionExecutionContext.cpp.

References CHECK, CHECK_EQ, ColumnFetcher::columnarized_table_cache_, table_functions::TableFunction::containsPreFlightFn(), CPU, Data_Namespace::CPU_LEVEL, anonymous_namespace{TableFunctionExecutionContext.cpp}::create_literal_buffer(), DEBUG_TIMER, get_bit_width(), ColumnFetcher::getOneColumnFragment(), getQueryEngineCudaStreamForDevice(), GPU, Data_Namespace::GPU_LEVEL, table_functions::TableFunction::hasOutputSizeIndependentOfInputSize(), TableFunctionExecutionUnit::input_exprs, launchCpuCode(), launchGpuCode(), launchPreCodeOnCpu(), TableFunctionExecutionUnit::table_func, TableFunctionExecutionUnit::target_exprs, and UNREACHABLE.

Referenced by Executor::executeTableFunction().

126  {
127  auto timer = DEBUG_TIMER(__func__);
128  CHECK(compilation_context);
129  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
130  std::vector<std::unique_ptr<char[]>> literals_owner;
131 
132  const int device_id = 0; // TODO(adb): support multi-gpu table functions
133  std::unique_ptr<CudaAllocator> device_allocator;
134  if (device_type == ExecutorDeviceType::GPU) {
135  auto data_mgr = executor->getDataMgr();
136  device_allocator.reset(new CudaAllocator(
137  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
138  }
139  std::vector<const int8_t*> col_buf_ptrs;
140  std::vector<int64_t> col_sizes;
141  std::vector<const int8_t*> input_str_dict_proxy_ptrs;
142  std::optional<size_t> input_num_rows;
143 
144  int col_index = -1;
145  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
146  // arguments are not supported on GPU atm.
147  std::vector<std::vector<const int8_t*>> col_list_bufs;
148  std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
149  for (const auto& input_expr : exe_unit.input_exprs) {
150  auto ti = input_expr->get_type_info();
151  if (!ti.is_column_list()) {
152  CHECK_EQ(col_index, -1);
153  }
154  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
155  auto table_id = col_var->get_table_id();
156  auto table_info_it = std::find_if(
157  table_infos.begin(), table_infos.end(), [&table_id](const auto& table_info) {
158  return table_info.table_id == table_id;
159  });
160  CHECK(table_info_it != table_infos.end());
161  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
162  executor,
163  *col_var,
164  table_info_it->info.fragments.front(),
167  device_id,
168  device_allocator.get(),
169  /*thread_idx=*/0,
170  chunks_owner,
171  column_fetcher.columnarized_table_cache_);
172  // We use the number of entries in the first column to be the number of rows to base
173  // the output off of (optionally depending on the sizing parameter)
174  if (!input_num_rows) {
175  input_num_rows = (buf_elem_count ? buf_elem_count : 1);
176  }
177 
178  int8_t* input_str_dict_proxy_ptr = nullptr;
179  if (ti.is_subtype_dict_encoded_string()) {
180  const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
181  ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
182  input_str_dict_proxy_ptr =
183  reinterpret_cast<int8_t*>(input_string_dictionary_proxy);
184  }
185  if (ti.is_column_list()) {
186  if (col_index == -1) {
187  col_list_bufs.push_back({});
188  input_col_list_str_dict_proxy_ptrs.push_back({});
189  col_list_bufs.back().reserve(ti.get_dimension());
190  input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
191  } else {
192  CHECK_EQ(col_sizes.back(), buf_elem_count);
193  }
194  col_index++;
195  col_list_bufs.back().push_back(col_buf);
196  input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
197  // append col_buf to column_list col_buf
198  if (col_index + 1 == ti.get_dimension()) {
199  col_index = -1;
200  }
201  // columns in the same column_list point to column_list data
202  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
203  input_str_dict_proxy_ptrs.push_back(
204  (const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
205  } else {
206  col_buf_ptrs.push_back(col_buf);
207  input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
208  }
209  col_sizes.push_back(buf_elem_count);
210  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
211  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
212  // separate serialization component
213  col_sizes.push_back(0);
214  input_str_dict_proxy_ptrs.push_back(nullptr);
215  const auto const_val_datum = constant_val->get_constval();
216  const auto& ti = constant_val->get_type_info();
217  if (ti.is_fp()) {
218  switch (get_bit_width(ti)) {
219  case 32:
220  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
221  device_type,
222  literals_owner,
223  device_allocator.get()));
224  break;
225  case 64:
226  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
227  device_type,
228  literals_owner,
229  device_allocator.get()));
230  break;
231  default:
232  UNREACHABLE();
233  }
234  } else if (ti.is_integer() || ti.is_timestamp()) {
235  switch (get_bit_width(ti)) {
236  case 8:
237  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
238  device_type,
239  literals_owner,
240  device_allocator.get()));
241  break;
242  case 16:
243  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
244  device_type,
245  literals_owner,
246  device_allocator.get()));
247  break;
248  case 32:
249  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
250  device_type,
251  literals_owner,
252  device_allocator.get()));
253  break;
254  case 64:
255  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
256  device_type,
257  literals_owner,
258  device_allocator.get()));
259  break;
260  default:
261  UNREACHABLE();
262  }
263  } else if (ti.is_boolean()) {
264  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.boolval,
265  device_type,
266  literals_owner,
267  device_allocator.get()));
268  } else if (ti.is_bytes()) { // text encoding none string
269  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.stringval,
270  device_type,
271  literals_owner,
272  device_allocator.get()));
273  } else {
274  throw TableFunctionError("Literal value " + constant_val->toString() +
275  " is not yet supported.");
276  }
277  }
278  }
279  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
280  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
281  if (!exe_unit.table_func
282  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
283  // user-specified constants,
284  // and runtime table funtion
285  // specified sizing, only
286  // user-specified row-multipliers
287  // currently take into account input
288  // row size
289  CHECK(input_num_rows);
290  }
291  std::vector<int8_t*> output_str_dict_proxy_ptrs;
292  for (const auto& output_expr : exe_unit.target_exprs) {
293  int8_t* output_str_dict_proxy_ptr = nullptr;
294  auto ti = output_expr->get_type_info();
295  if (ti.is_dict_encoded_string()) {
296  const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
297  ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
298  output_str_dict_proxy_ptr =
299  reinterpret_cast<int8_t*>(output_string_dictionary_proxy);
300  }
301  output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
302  }
303 
304  if (is_pre_launch_udtf) {
307  exe_unit,
308  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
309  col_buf_ptrs,
310  col_sizes,
311  *input_num_rows,
312  executor);
313  return nullptr;
314  } else {
315  switch (device_type) {
317  return launchCpuCode(
318  exe_unit,
319  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
320  col_buf_ptrs,
321  col_sizes,
322  input_str_dict_proxy_ptrs,
323  *input_num_rows,
324  output_str_dict_proxy_ptrs,
325  executor);
327  return launchGpuCode(
328  exe_unit,
329  std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
330  col_buf_ptrs,
331  col_sizes,
332  input_str_dict_proxy_ptrs,
333  *input_num_rows,
334  output_str_dict_proxy_ptrs,
335  /*device_id=*/0,
336  executor);
337  }
338  }
339  UNREACHABLE();
340  return nullptr;
341 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
std::vector< Analyzer::Expr * > input_exprs
const table_functions::TableFunction table_func
#define UNREACHABLE()
Definition: Logger.h:266
ColumnCacheMap columnarized_table_cache_
size_t get_bit_width(const SQLTypeInfo &ti)
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
const int8_t * create_literal_buffer(const T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::vector< Analyzer::Expr * > target_exprs
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr TableFunctionExecutionContext::launchCpuCode ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< CpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
std::vector< const int8_t * > &  input_str_dict_proxy_ptrs,
const size_t  elem_count,
std::vector< int8_t * > &  output_str_dict_proxy_ptrs,
Executor executor 
)
private

Definition at line 402 of file TableFunctionExecutionContext.cpp.

References align_to_int64(), CHECK, CHECK_EQ, DEBUG_TIMER, GenericError, anonymous_namespace{TableFunctionExecutionContext.cpp}::get_output_row_count(), row_set_mem_owner_, and to_string().

Referenced by execute().

410  {
411  auto timer = DEBUG_TIMER(__func__);
412  int64_t output_row_count = 0;
413 
414  // If TableFunctionManager must be a singleton but it has been
415  // initialized from another thread, TableFunctionManager constructor
416  // blocks via TableFunctionManager_singleton_mutex until the
417  // existing singleton is deconstructed.
418  auto mgr = std::make_unique<TableFunctionManager>(
419  exe_unit,
420  executor,
421  col_buf_ptrs,
423  /*is_singleton=*/!exe_unit.table_func.usesManager());
424 
425  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
426  // allocate output buffers because the size is known up front, from
427  // user specified parameters (and table size in the case of a user
428  // specified row multiplier)
429  output_row_count = get_output_row_count(exe_unit, elem_count);
430  } else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
431  output_row_count = exe_unit.output_buffer_size_param;
432  }
433 
434  // setup the inputs
435  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
436  const auto byte_stream_ptr = !col_buf_ptrs.empty()
437  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
438  : nullptr;
439  if (!col_buf_ptrs.empty()) {
440  CHECK(byte_stream_ptr);
441  }
442  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
443  if (!col_sizes.empty()) {
444  CHECK(col_sizes_ptr);
445  }
446  const auto input_str_dict_proxy_byte_stream_ptr =
447  !input_str_dict_proxy_ptrs.empty()
448  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
449  : nullptr;
450 
451  const auto output_str_dict_proxy_byte_stream_ptr =
452  !output_str_dict_proxy_ptrs.empty()
453  ? reinterpret_cast<int8_t**>(output_str_dict_proxy_ptrs.data())
454  : nullptr;
455 
456  // execute
457  const auto err = compilation_context->table_function_entry_point()(
458  reinterpret_cast<const int8_t*>(mgr.get()),
459  byte_stream_ptr, // input columns buffer
460  col_sizes_ptr, // input column sizes
461  input_str_dict_proxy_byte_stream_ptr, // input str dictionary proxies
462  nullptr,
463  output_str_dict_proxy_byte_stream_ptr,
464  &output_row_count);
465 
467  throw UserTableFunctionError("Error executing table function: " +
468  std::string(mgr->get_error_message()));
469  }
470 
471  else if (err) {
472  throw UserTableFunctionError("Error executing table function: " +
473  std::to_string(err));
474  }
475 
476  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
477  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
478  throw TableFunctionError(
479  "Table function with constant sizing parameter must return " +
480  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
481  ")");
482  }
483  } else {
484  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
485  output_row_count = mgr->get_nrows();
486  }
487  }
488  // Update entry count, it may differ from allocated mem size
489  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
490  // set_output_row_size has not been called
491  if (output_row_count == 0) {
492  // allocate for empty output columns
493  mgr->allocate_output_buffers(0);
494  } else {
495  throw TableFunctionError("Table function must call set_output_row_size");
496  }
497  }
498 
499  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
500 
501  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
502  CHECK(group_by_buffers_ptr);
503  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
504 
505  auto num_out_columns = exe_unit.target_exprs.size();
506  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
507  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
508  // Todo (todd): Consolidate this column byte offset logic that occurs in at least 4
509  // places
510  for (size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
511  const size_t target_width =
512  exe_unit.target_exprs[col_idx]->get_type_info().get_size();
513  const size_t allocated_column_size = target_width * mgr->get_nrows();
514  const size_t actual_column_size = target_width * output_row_count;
515  if (src != dst) {
516  auto t = memmove(dst, src, actual_column_size);
517  CHECK_EQ(dst, t);
518  }
519  src = align_to_int64(src + allocated_column_size);
520  dst = align_to_int64(dst + actual_column_size);
521  }
522  return mgr->query_buffers->getResultSetOwned(0);
523 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
std::string to_string(char const *&&v)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr TableFunctionExecutionContext::launchGpuCode ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< GpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
std::vector< const int8_t * > &  input_str_dict_proxy_ptrs,
const size_t  elem_count,
std::vector< int8_t * > &  output_str_dict_proxy_ptrs,
const int  device_id,
Executor executor 
)
private

Definition at line 539 of file TableFunctionExecutionContext.cpp.

References QueryMemoryDescriptor::addColSlotInfo(), CHECK, CHECK_EQ, checkCudaErrors(), anonymous_namespace{TableFunctionExecutionContext.cpp}::COL_BUFFERS, anonymous_namespace{TableFunctionExecutionContext.cpp}::COL_SIZES, DEBUG_TIMER, anonymous_namespace{TableFunctionExecutionContext.cpp}::ERROR_BUFFER, anonymous_namespace{TableFunctionExecutionContext.cpp}::get_output_row_count(), getQueryEngineCudaStreamForDevice(), GPU, table_functions::TableFunction::hasTableFunctionSpecifiedParameter(), anonymous_namespace{TableFunctionExecutionContext.cpp}::KERNEL_PARAM_COUNT, anonymous_namespace{TableFunctionExecutionContext.cpp}::MANAGER, anonymous_namespace{TableFunctionExecutionContext.cpp}::OUTPUT_BUFFERS, anonymous_namespace{TableFunctionExecutionContext.cpp}::OUTPUT_ROW_COUNT, query_mem_desc, row_set_mem_owner_, QueryMemoryDescriptor::setOutputColumnar(), TableFunctionExecutionUnit::table_func, TableFunction, TableFunctionExecutionUnit::target_exprs, to_string(), and UNREACHABLE.

Referenced by execute().

548  {
549 #ifdef HAVE_CUDA
550  auto timer = DEBUG_TIMER(__func__);
552  throw QueryMustRunOnCpu();
553  }
554 
555  auto num_out_columns = exe_unit.target_exprs.size();
556  auto data_mgr = executor->getDataMgr();
557  auto gpu_allocator = std::make_unique<CudaAllocator>(
558  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
559  CHECK(gpu_allocator);
560  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
561 
562  // TODO: implement table function manager for CUDA
563  // kernels. kernel_params[MANAGER] ought to contain a device pointer
564  // to a struct that a table function kernel with a
565  // TableFunctionManager argument can access from the device.
566  kernel_params[MANAGER] =
567  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int8_t*)));
568 
569  // setup the inputs
570  auto byte_stream_ptr = !(col_buf_ptrs.empty())
571  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
572  : nullptr;
573  if (byte_stream_ptr) {
574  gpu_allocator->copyToDevice(byte_stream_ptr,
575  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
576  col_buf_ptrs.size() * sizeof(int64_t));
577  }
578  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
579 
580  auto col_sizes_ptr = !(col_sizes.empty())
581  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
582  : nullptr;
583  if (col_sizes_ptr) {
584  gpu_allocator->copyToDevice(col_sizes_ptr,
585  reinterpret_cast<int8_t*>(col_sizes.data()),
586  col_sizes.size() * sizeof(int64_t));
587  }
588  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
589 
590  kernel_params[ERROR_BUFFER] =
591  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
592  // initialize output memory
594  elem_count,
596  /*is_table_function=*/true);
597  query_mem_desc.setOutputColumnar(true);
598 
599  for (size_t i = 0; i < num_out_columns; i++) {
600  const size_t col_width = exe_unit.target_exprs[i]->get_type_info().get_size();
601  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
602  }
603  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
604  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
605  exe_unit,
607  device_id,
609  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
610  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
611  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
613  gpu_allocator.get(),
614  executor);
615 
616  // setup the output
617  int64_t output_row_count = allocated_output_row_count;
618 
619  kernel_params[OUTPUT_ROW_COUNT] =
620  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
621  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
622  reinterpret_cast<int8_t*>(&output_row_count),
623  sizeof(output_row_count));
624  /*
625   TODO: RBC generated runtime table functions do not support
626  concurrent execution on a CUDA device. Hence, we'll force 1 as
627  block/grid size in the case of runtime table functions. To support
628  this, in RBC, we'll need to expose threadIdx/blockIdx/blockDim to
629  runtime table functions and these must do something sensible with
630  this information..
631  */
632  const unsigned block_size_x =
633  (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
634  const unsigned block_size_y = 1;
635  const unsigned block_size_z = 1;
636  const unsigned grid_size_x =
637  (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
638  const unsigned grid_size_y = 1;
639  const unsigned grid_size_z = 1;
640 
641  auto gpu_output_buffers =
642  query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
643  device_id,
644  block_size_x,
645  grid_size_x,
646  true /* zero_initialize_buffers */);
647 
648  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
649 
650  // execute
651  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
652 
653  std::vector<void*> param_ptrs;
654  for (auto& param : kernel_params) {
655  param_ptrs.push_back(&param);
656  }
657 
658  // Get cu func
659 
660  CHECK(compilation_context);
661  const auto native_code = compilation_context->getNativeCode(device_id);
662  auto cu_func = static_cast<CUfunction>(native_code.first);
663  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
664  checkCudaErrors(cuLaunchKernel(cu_func,
665  grid_size_x,
666  grid_size_y,
667  grid_size_z,
668  block_size_x,
669  block_size_y,
670  block_size_z,
671  0, // shared mem bytes
672  qe_cuda_stream,
673  &param_ptrs[0],
674  nullptr));
675  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
676 
677  // read output row count from GPU
678  gpu_allocator->copyFromDevice(
679  reinterpret_cast<int8_t*>(&output_row_count),
680  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
681  sizeof(int64_t));
682  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
683  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
684  throw TableFunctionError(
685  "Table function with constant sizing parameter must return " +
686  std::to_string(allocated_output_row_count) + " (got " +
687  std::to_string(output_row_count) + ")");
688  }
689  } else {
690  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
691  output_row_count = allocated_output_row_count;
692  }
693  }
694 
695  // Update entry count, it may differ from allocated mem size
696  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
697 
698  // Copy back to CPU storage
699  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
700  query_mem_desc,
701  output_row_count,
702  gpu_output_buffers,
703  device_id,
704  block_size_x,
705  grid_size_x);
706 
707  return query_buffers->getResultSetOwned(0);
708 #else
709  UNREACHABLE();
710  return nullptr;
711 #endif
712 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:28
#define UNREACHABLE()
Definition: Logger.h:266
std::string to_string(char const *&&v)
void * CUfunction
Definition: nocuda.h:25
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::vector< Analyzer::Expr * > target_exprs

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void TableFunctionExecutionContext::launchPreCodeOnCpu ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< CpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
const size_t  elem_count,
Executor executor 
)
private

Definition at line 345 of file TableFunctionExecutionContext.cpp.

References CHECK, DEBUG_TIMER, GenericError, row_set_mem_owner_, and to_string().

Referenced by execute().

351  {
352  auto timer = DEBUG_TIMER(__func__);
353  int64_t output_row_count = 0;
354 
355  // If TableFunctionManager must be a singleton but it has been
356  // initialized from another thread, TableFunctionManager constructor
357  // blocks via TableFunctionManager_singleton_mutex until the
358  // existing singleton is deconstructed.
359  auto mgr = std::make_unique<TableFunctionManager>(
360  exe_unit,
361  executor,
362  col_buf_ptrs,
364  /*is_singleton=*/!exe_unit.table_func.usesManager());
365 
366  // setup the inputs
367  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
368  const auto byte_stream_ptr = !col_buf_ptrs.empty()
369  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
370  : nullptr;
371  if (!col_buf_ptrs.empty()) {
372  CHECK(byte_stream_ptr);
373  }
374  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
375  if (!col_sizes.empty()) {
376  CHECK(col_sizes_ptr);
377  }
378 
379  // execute
380  const auto err = compilation_context->table_function_entry_point()(
381  reinterpret_cast<const int8_t*>(mgr.get()),
382  byte_stream_ptr, // input columns buffer
383  col_sizes_ptr, // input column sizes
384  nullptr, // input string dictionary proxy ptrs - not supported for pre-flights yet
385  nullptr,
386  nullptr, // output string dictionary proxy ptrs - not supported for pre-flights yet
387  &output_row_count);
388 
389  if (exe_unit.table_func.hasPreFlightOutputSizer()) {
390  exe_unit.output_buffer_size_param = output_row_count;
391  }
392 
394  throw UserTableFunctionError("Error executing table function pre flight check: " +
395  std::string(mgr->get_error_message()));
396  } else if (err) {
397  throw UserTableFunctionError("Error executing table function pre flight check: " +
398  std::to_string(err));
399  }
400 }
std::string to_string(char const *&&v)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TableFunctionExecutionContext& TableFunctionExecutionContext::operator= ( const TableFunctionExecutionContext )
delete

Member Data Documentation

std::shared_ptr<RowSetMemoryOwner> TableFunctionExecutionContext::row_set_mem_owner_
private

The documentation for this class was generated from the following files: