OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext Class Reference

#include <TableFunctionExecutionContext.h>

Public Member Functions

 TableFunctionExecutionContext (std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
 TableFunctionExecutionContext (const TableFunctionExecutionContext &)=delete
 
TableFunctionExecutionContextoperator= (const TableFunctionExecutionContext &)=delete
 
ResultSetPtr execute (const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
 

Private Member Functions

void launchPreCodeOnCpu (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, Executor *executor)
 
ResultSetPtr launchCpuCode (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
 
ResultSetPtr launchGpuCode (const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
 

Private Attributes

std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 

Detailed Description

Definition at line 27 of file TableFunctionExecutionContext.h.

Constructor & Destructor Documentation

TableFunctionExecutionContext::TableFunctionExecutionContext ( std::shared_ptr< RowSetMemoryOwner row_set_mem_owner)
inline

Definition at line 29 of file TableFunctionExecutionContext.h.

30  : row_set_mem_owner_(row_set_mem_owner) {}
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
TableFunctionExecutionContext::TableFunctionExecutionContext ( const TableFunctionExecutionContext )
delete

Member Function Documentation

ResultSetPtr TableFunctionExecutionContext::execute ( const TableFunctionExecutionUnit exe_unit,
const std::vector< InputTableInfo > &  table_infos,
const std::shared_ptr< CompilationContext > &  compilation_context,
const ColumnFetcher column_fetcher,
const ExecutorDeviceType  device_type,
Executor executor,
bool  is_pre_launch_udtf 
)

Definition at line 104 of file TableFunctionExecutionContext.cpp.

References anonymous_namespace{TableFunctionExecutionContext.cpp}::append_literal_buffer(), CHECK, CHECK_EQ, ColumnFetcher::columnarized_table_cache_, table_functions::TableFunction::containsPreFlightFn(), CPU, Data_Namespace::CPU_LEVEL, DEBUG_TIMER, get_bit_width(), ColumnFetcher::getOneColumnFragment(), getQueryEngineCudaStreamForDevice(), GPU, Data_Namespace::GPU_LEVEL, table_functions::TableFunction::hasOutputSizeIndependentOfInputSize(), TableFunctionExecutionUnit::input_exprs, is_null(), launchCpuCode(), launchGpuCode(), launchPreCodeOnCpu(), TableFunctionExecutionUnit::table_func, TableFunctionExecutionUnit::target_exprs, and UNREACHABLE.

Referenced by Executor::executeTableFunction().

111  {
112  auto timer = DEBUG_TIMER(__func__);
113  CHECK(compilation_context);
114  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
115  std::vector<std::unique_ptr<char[]>> literals_owner;
116 
117  const int device_id = 0; // TODO(adb): support multi-gpu table functions
118  std::unique_ptr<CudaAllocator> device_allocator;
119  if (device_type == ExecutorDeviceType::GPU) {
120  auto data_mgr = executor->getDataMgr();
121  device_allocator.reset(new CudaAllocator(
122  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
123  }
124  std::vector<const int8_t*> col_buf_ptrs;
125  std::vector<int64_t> col_sizes;
126  std::vector<const int8_t*> input_str_dict_proxy_ptrs;
127  std::optional<size_t> input_num_rows;
128 
129  int col_index = -1;
130  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
131  // arguments are not supported on GPU atm.
132  std::vector<std::vector<const int8_t*>> col_list_bufs;
133  std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
134  for (const auto& input_expr : exe_unit.input_exprs) {
135  auto ti = input_expr->get_type_info();
136  if (!ti.is_column_list()) {
137  CHECK_EQ(col_index, -1);
138  }
139  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
140  CHECK(ti.is_column_list() || ti.is_column()) << "ti=" << ti;
141  const auto& table_key = col_var->getTableKey();
142  auto table_info_it = std::find_if(
143  table_infos.begin(), table_infos.end(), [&table_key](const auto& table_info) {
144  return table_info.table_key == table_key;
145  });
146  CHECK(table_info_it != table_infos.end());
147  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
148  executor,
149  *col_var,
150  table_info_it->info.fragments.front(),
153  device_id,
154  device_allocator.get(),
155  /*thread_idx=*/0,
156  chunks_owner,
157  column_fetcher.columnarized_table_cache_);
158  // We use the number of entries in the first column to be the number of rows to base
159  // the output off of (optionally depending on the sizing parameter)
160  if (!input_num_rows) {
161  input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
162  }
163 
164  int8_t* input_str_dict_proxy_ptr = nullptr;
165  if (ti.is_subtype_dict_encoded_string()) {
166  const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
167  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
168  input_str_dict_proxy_ptr =
169  reinterpret_cast<int8_t*>(input_string_dictionary_proxy);
170  }
171  if (ti.is_column_list()) {
172  if (col_index == -1) {
173  col_list_bufs.emplace_back();
174  input_col_list_str_dict_proxy_ptrs.emplace_back();
175  col_list_bufs.back().reserve(ti.get_dimension());
176  input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
177  } else {
178  CHECK_EQ(col_sizes.back(), buf_elem_count);
179  }
180  col_index++;
181  col_list_bufs.back().push_back(col_buf);
182  input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
183  // append col_buf to column_list col_buf
184  if (col_index + 1 == ti.get_dimension()) {
185  col_index = -1;
186  }
187  // columns in the same column_list point to column_list data
188  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
189  input_str_dict_proxy_ptrs.push_back(
190  (const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
191  } else {
192  col_buf_ptrs.push_back(col_buf);
193  input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
194  }
195  col_sizes.push_back(buf_elem_count);
196  } else {
197  // literals
198  col_sizes.push_back(0);
199  input_str_dict_proxy_ptrs.push_back(nullptr);
200  size_t literal_buffer_size = 0;
201  int8_t* cpu_literal_buf_ptr = nullptr;
202 
203  if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
204  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as
205  // a separate serialization component
206  const auto const_val_datum = constant_val->get_constval();
207  const auto& ti = constant_val->get_type_info();
208  if (ti.is_text_encoding_none()) {
209  // clang-format off
210  /*
211  Literal string is encoded in a contiguous buffer with the
212  following memory layout:
213 
214  | <string size> | <string data> |
215  |<-- 8 bytes -->|<-- <string size> -->|
216  */
217  // clang-format on
218  literal_buffer_size =
219  sizeof(int64_t) + ((const_val_datum.stringval->size() + 7) / 8) * 8;
220  } else {
221  literal_buffer_size = ((get_bit_width(ti) / 8 + 7) / 8) * 8;
222  }
223  // literal_buffer_size is round up to the next multiple of 8
224  literals_owner.emplace_back(std::make_unique<char[]>(literal_buffer_size));
225  cpu_literal_buf_ptr = reinterpret_cast<int8_t*>(literals_owner.back().get());
226  append_literal_buffer(const_val_datum, ti, cpu_literal_buf_ptr, 0);
227  } else if (const auto& array_expr =
228  dynamic_cast<Analyzer::ArrayExpr*>(input_expr)) {
229  const auto& ti = input_expr->get_type_info().get_elem_type();
230  // clang-format off
231  /*
232  Literal array expression is encoded in a contiguous buffer
233  with the following memory layout:
234 
235  | <array size> | <array is_null> | <array data> |
236  |<-- 8 bytes ->|<-- 8 bytes ---->|<-- <array size> * <array element size> -->|
237  */
238  // clang-format on
239  int64_t size = array_expr->getElementCount();
240  int64_t is_null = (array_expr->isNull() ? 0xffffffffffffffff : 0);
241  const auto elem_size = get_bit_width(ti) / 8;
242  // literal_buffer_size is round up to the next multiple of 8
243  literal_buffer_size = 2 * sizeof(int64_t) + (((size + 7) / 8) * 8) * elem_size;
244  literals_owner.emplace_back(std::make_unique<char[]>(literal_buffer_size));
245  cpu_literal_buf_ptr = reinterpret_cast<int8_t*>(literals_owner.back().get());
246  std::memcpy(cpu_literal_buf_ptr, &size, sizeof(int64_t));
247  std::memcpy(cpu_literal_buf_ptr + sizeof(int64_t), &is_null, sizeof(int64_t));
248  for (int64_t i = 0; i < size; i++) {
249  if (const auto& constant_val =
250  dynamic_cast<const Analyzer::Constant*>(array_expr->getElement(i))) {
251  const auto const_val_datum = constant_val->get_constval();
252  append_literal_buffer(const_val_datum,
253  ti,
254  cpu_literal_buf_ptr,
255  sizeof(int64_t) * 2 + i * elem_size);
256  } else {
257  UNREACHABLE();
258  }
259  }
260  } else {
261  throw TableFunctionError("Unsupported expression as input to table function: " +
262  input_expr->toString() +
263  "\n Only literal constants and columns are supported!");
264  }
265  if (device_type == ExecutorDeviceType::GPU) {
266  auto* gpu_allocator = device_allocator.get();
267  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(literal_buffer_size);
268  gpu_allocator->copyToDevice(
269  gpu_literal_buf_ptr, cpu_literal_buf_ptr, literal_buffer_size);
270  col_buf_ptrs.push_back(gpu_literal_buf_ptr);
271  } else {
272  CHECK_EQ(device_type, ExecutorDeviceType::CPU);
273  col_buf_ptrs.push_back(cpu_literal_buf_ptr);
274  }
275  }
276  }
277  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
278  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
279  if (!exe_unit.table_func
280  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
281  // user-specified constants,
282  // and runtime table funtion
283  // specified sizing, only
284  // user-specified row-multipliers
285  // currently take into account input
286  // row size
287  CHECK(input_num_rows);
288  }
289  std::vector<int8_t*> output_str_dict_proxy_ptrs;
290  for (const auto& output_expr : exe_unit.target_exprs) {
291  int8_t* output_str_dict_proxy_ptr = nullptr;
292  auto ti = output_expr->get_type_info();
293  if (ti.is_dict_encoded_string()) {
294  const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
295  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
296  output_str_dict_proxy_ptr =
297  reinterpret_cast<int8_t*>(output_string_dictionary_proxy);
298  }
299  output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
300  }
301 
302  if (is_pre_launch_udtf) {
305  exe_unit,
306  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
307  col_buf_ptrs,
308  col_sizes,
309  input_str_dict_proxy_ptrs,
310  *input_num_rows,
311  executor);
312  return nullptr;
313  } else {
314  switch (device_type) {
316  return launchCpuCode(
317  exe_unit,
318  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
319  col_buf_ptrs,
320  col_sizes,
321  input_str_dict_proxy_ptrs,
322  *input_num_rows,
323  output_str_dict_proxy_ptrs,
324  executor);
326  return launchGpuCode(
327  exe_unit,
328  std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
329  col_buf_ptrs,
330  col_sizes,
331  input_str_dict_proxy_ptrs,
332  *input_num_rows,
333  output_str_dict_proxy_ptrs,
334  /*device_id=*/0,
335  executor);
336  }
337  }
338  UNREACHABLE();
339  return nullptr;
340 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< Analyzer::Expr * > input_exprs
const table_functions::TableFunction table_func
#define UNREACHABLE()
Definition: Logger.h:338
ColumnCacheMap columnarized_table_cache_
size_t get_bit_width(const SQLTypeInfo &ti)
CONSTEXPR DEVICE bool is_null(const T &value)
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, Executor *executor)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< Analyzer::Expr * > target_exprs
void append_literal_buffer(const Datum &d, const SQLTypeInfo &ti, int8_t *literal_buffer, int64_t offset)
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr TableFunctionExecutionContext::launchCpuCode ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< CpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
std::vector< const int8_t * > &  input_str_dict_proxy_ptrs,
const size_t  elem_count,
std::vector< int8_t * > &  output_str_dict_proxy_ptrs,
Executor executor 
)
private

Definition at line 494 of file TableFunctionExecutionContext.cpp.

References align_to_int64(), CHECK, CHECK_EQ, DEBUG_TIMER, GenericError, anonymous_namespace{TableFunctionExecutionContext.cpp}::get_output_row_count(), FlatBufferManager::getBufferSize(), NotAnError, row_set_mem_owner_, and to_string().

Referenced by execute().

502  {
503  auto timer = DEBUG_TIMER(__func__);
504  int64_t output_row_count = 0;
505 
506  // If TableFunctionManager must be a singleton but it has been
507  // initialized from another thread, TableFunctionManager constructor
508  // blocks via TableFunctionManager_singleton_mutex until the
509  // existing singleton is deconstructed.
510  auto mgr = std::make_unique<TableFunctionManager>(
511  exe_unit,
512  executor,
513  col_buf_ptrs,
515  /*is_singleton=*/!exe_unit.table_func.usesManager());
516 
517  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
518  // allocate output buffers because the size is known up front, from
519  // user specified parameters (and table size in the case of a user
520  // specified row multiplier)
521  output_row_count = get_output_row_count(exe_unit, elem_count);
522  } else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
523  output_row_count = exe_unit.output_buffer_size_param;
524  }
525 
526  // setup the inputs
527  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
528  const auto byte_stream_ptr = !col_buf_ptrs.empty()
529  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
530  : nullptr;
531  if (!col_buf_ptrs.empty()) {
532  CHECK(byte_stream_ptr);
533  }
534  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
535  if (!col_sizes.empty()) {
536  CHECK(col_sizes_ptr);
537  }
538  const auto input_str_dict_proxy_byte_stream_ptr =
539  !input_str_dict_proxy_ptrs.empty()
540  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
541  : nullptr;
542 
543  const auto output_str_dict_proxy_byte_stream_ptr =
544  !output_str_dict_proxy_ptrs.empty()
545  ? reinterpret_cast<int8_t**>(output_str_dict_proxy_ptrs.data())
546  : nullptr;
547 
548  // execute
549  int32_t err;
550  try {
551  err = compilation_context->table_function_entry_point()(
552  reinterpret_cast<const int8_t*>(mgr.get()),
553  byte_stream_ptr, // input columns buffer
554  col_sizes_ptr, // input column sizes
555  input_str_dict_proxy_byte_stream_ptr, // input str dictionary proxies
556  nullptr,
557  output_str_dict_proxy_byte_stream_ptr,
558  &output_row_count);
559  } catch (std::exception const& e) {
560  throw UserTableFunctionError("Error executing table function: " +
561  std::string(e.what()));
562  }
563 
565  // table_function_entry_point does not initialize output_row_count
566  // when a UDTF returns NotAnError, so we'll set it here.
567  output_row_count = mgr->get_nrows();
568  } else if (err == TableFunctionErrorCode::GenericError) {
569  throw UserTableFunctionError("Error executing table function: " +
570  std::string(mgr->get_error_message()));
571  }
572 
573  else if (err) {
574  throw UserTableFunctionError("Error executing table function: " +
575  std::to_string(err));
576  }
577 
578  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
579  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
580  throw TableFunctionError(
581  "Table function with constant sizing parameter must return " +
582  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
583  ")");
584  }
585  } else {
586  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
587  output_row_count = mgr->get_nrows();
588  }
589  }
590  // Update entry count, it may differ from allocated mem size
591  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
592  // set_output_row_size has not been called
593  if (output_row_count == 0) {
594  // allocate for empty output columns
595  mgr->allocate_output_buffers(0);
596  } else {
597  throw TableFunctionError("Table function must call set_output_row_size");
598  }
599  }
600 
601  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
602 
603  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
604  CHECK(group_by_buffers_ptr);
605  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
606 
607  auto num_out_columns = exe_unit.target_exprs.size();
608  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
609  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
610  // Todo (todd): Consolidate this column byte offset logic that occurs in at least 4
611  // places
612 
613  for (size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
614  auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
615  if (ti.usesFlatBuffer()) {
616  // TODO: implement FlatBuffer normalization when the
617  // max_nof_values is larger than the nof specified values.
618  //
619  // TODO: implement flatbuffer resize when output_row_count < mgr->get_nrows()
620  CHECK_EQ(mgr->get_nrows(), output_row_count);
621  FlatBufferManager m{src};
622  const size_t allocated_column_size = m.getBufferSize();
623  const size_t actual_column_size = allocated_column_size;
624  src = align_to_int64(src + allocated_column_size);
625  dst = align_to_int64(dst + actual_column_size);
626  if (ti.is_text_encoding_dict_array()) {
627  const auto* ti_lite =
628  reinterpret_cast<const SQLTypeInfoLite*>(m.get_user_data_buffer());
629  CHECK(ti_lite);
630  CHECK_EQ(*ti_lite, ti.toLite()); // ensure dict/db_id are preserved
631  }
632  } else {
633  const size_t target_width = ti.get_size();
634  const size_t allocated_column_size = target_width * mgr->get_nrows();
635  const size_t actual_column_size = target_width * output_row_count;
636  if (src != dst) {
637  auto t = memmove(dst, src, actual_column_size);
638  CHECK_EQ(dst, t);
639  }
640  src = align_to_int64(src + allocated_column_size);
641  dst = align_to_int64(dst + actual_column_size);
642  }
643  }
644  return mgr->query_buffers->getResultSetOwned(0);
645 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
std::string to_string(char const *&&v)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
static int64_t getBufferSize(const void *buffer)
Definition: FlatBuffer.h:553

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ResultSetPtr TableFunctionExecutionContext::launchGpuCode ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< GpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
std::vector< const int8_t * > &  input_str_dict_proxy_ptrs,
const size_t  elem_count,
std::vector< int8_t * > &  output_str_dict_proxy_ptrs,
const int  device_id,
Executor executor 
)
private

Definition at line 661 of file TableFunctionExecutionContext.cpp.

References QueryMemoryDescriptor::addColSlotInfo(), CHECK, CHECK_EQ, checkCudaErrors(), anonymous_namespace{TableFunctionExecutionContext.cpp}::COL_BUFFERS, anonymous_namespace{TableFunctionExecutionContext.cpp}::COL_SIZES, DEBUG_TIMER, anonymous_namespace{TableFunctionExecutionContext.cpp}::ERROR_BUFFER, anonymous_namespace{TableFunctionExecutionContext.cpp}::get_output_row_count(), getQueryEngineCudaStreamForDevice(), GPU, table_functions::TableFunction::hasTableFunctionSpecifiedParameter(), anonymous_namespace{TableFunctionExecutionContext.cpp}::KERNEL_PARAM_COUNT, anonymous_namespace{TableFunctionExecutionContext.cpp}::MANAGER, anonymous_namespace{TableFunctionExecutionContext.cpp}::OUTPUT_BUFFERS, anonymous_namespace{TableFunctionExecutionContext.cpp}::OUTPUT_ROW_COUNT, query_mem_desc, row_set_mem_owner_, TableFunctionExecutionUnit::table_func, TableFunction, TableFunctionExecutionUnit::target_exprs, to_string(), UNREACHABLE, and VLOG.

Referenced by execute().

670  {
671 #ifdef HAVE_CUDA
672  auto timer = DEBUG_TIMER(__func__);
674  throw QueryMustRunOnCpu();
675  }
676 
677  auto num_out_columns = exe_unit.target_exprs.size();
678  auto data_mgr = executor->getDataMgr();
679  auto gpu_allocator = std::make_unique<CudaAllocator>(
680  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
681  CHECK(gpu_allocator);
682  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
683 
684  // TODO: implement table function manager for CUDA
685  // kernels. kernel_params[MANAGER] ought to contain a device pointer
686  // to a struct that a table function kernel with a
687  // TableFunctionManager argument can access from the device.
688  kernel_params[MANAGER] =
689  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int8_t*)));
690 
691  // setup the inputs
692  auto byte_stream_ptr = !(col_buf_ptrs.empty())
693  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
694  : nullptr;
695  if (byte_stream_ptr) {
696  gpu_allocator->copyToDevice(byte_stream_ptr,
697  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
698  col_buf_ptrs.size() * sizeof(int64_t));
699  }
700  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
701 
702  auto col_sizes_ptr = !(col_sizes.empty())
703  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
704  : nullptr;
705  if (col_sizes_ptr) {
706  gpu_allocator->copyToDevice(col_sizes_ptr,
707  reinterpret_cast<int8_t*>(col_sizes.data()),
708  col_sizes.size() * sizeof(int64_t));
709  }
710  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
711 
712  kernel_params[ERROR_BUFFER] =
713  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
714  // initialize output memory
716  executor, elem_count, QueryDescriptionType::TableFunction);
717 
718  for (size_t i = 0; i < num_out_columns; i++) {
719  const size_t col_width = exe_unit.target_exprs[i]->get_type_info().get_size();
720  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
721  }
722  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
723  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
724  exe_unit,
726  device_id,
728  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
729  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
730  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
732  gpu_allocator.get(),
733  executor);
734 
735  // setup the output
736  int64_t output_row_count = allocated_output_row_count;
737 
738  kernel_params[OUTPUT_ROW_COUNT] =
739  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
740  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
741  reinterpret_cast<int8_t*>(&output_row_count),
742  sizeof(output_row_count));
743  /*
744   TODO: RBC generated runtime table functions do not support
745  concurrent execution on a CUDA device. Hence, we'll force 1 as
746  block/grid size in the case of runtime table functions. To support
747  this, in RBC, we'll need to expose threadIdx/blockIdx/blockDim to
748  runtime table functions and these must do something sensible with
749  this information..
750  */
751  const unsigned block_size_x =
752  (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
753  const unsigned block_size_y = 1;
754  const unsigned block_size_z = 1;
755  const unsigned grid_size_x =
756  (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
757  const unsigned grid_size_y = 1;
758  const unsigned grid_size_z = 1;
759 
760  auto gpu_output_buffers =
761  query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
762  device_id,
763  block_size_x,
764  grid_size_x,
765  true /* zero_initialize_buffers */);
766 
767  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
768 
769  // execute
770  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
771 
772  std::vector<void*> param_ptrs;
773  for (auto& param : kernel_params) {
774  param_ptrs.push_back(&param);
775  }
776 
777  // Get cu func
778 
779  CHECK(compilation_context);
780  const auto native_code = compilation_context->getNativeCode(device_id);
781  auto cu_func = static_cast<CUfunction>(native_code.first);
782  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
783  VLOG(1) << "Launch GPU table function kernel compiled with the following block and "
784  "grid sizes: "
785  << block_size_x << " and " << grid_size_x;
786  checkCudaErrors(cuLaunchKernel(cu_func,
787  grid_size_x,
788  grid_size_y,
789  grid_size_z,
790  block_size_x,
791  block_size_y,
792  block_size_z,
793  0, // shared mem bytes
794  qe_cuda_stream,
795  &param_ptrs[0],
796  nullptr));
797  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
798 
799  // read output row count from GPU
800  gpu_allocator->copyFromDevice(
801  reinterpret_cast<int8_t*>(&output_row_count),
802  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
803  sizeof(int64_t));
804  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
805  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
806  throw TableFunctionError(
807  "Table function with constant sizing parameter must return " +
808  std::to_string(allocated_output_row_count) + " (got " +
809  std::to_string(output_row_count) + ")");
810  }
811  } else {
812  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
813  output_row_count = allocated_output_row_count;
814  }
815  }
816 
817  // Update entry count, it may differ from allocated mem size
818  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
819 
820  // Copy back to CPU storage
821  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
822  query_mem_desc,
823  output_row_count,
824  gpu_output_buffers,
825  device_id,
826  block_size_x,
827  grid_size_x);
828 
829  return query_buffers->getResultSetOwned(0);
830 #else
831  UNREACHABLE();
832  return nullptr;
833 #endif
834 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:28
#define UNREACHABLE()
Definition: Logger.h:338
std::string to_string(char const *&&v)
void * CUfunction
Definition: nocuda.h:25
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< Analyzer::Expr * > target_exprs
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void TableFunctionExecutionContext::launchPreCodeOnCpu ( const TableFunctionExecutionUnit exe_unit,
const std::shared_ptr< CpuCompilationContext > &  compilation_context,
std::vector< const int8_t * > &  col_buf_ptrs,
std::vector< int64_t > &  col_sizes,
std::vector< const int8_t * > &  input_str_dict_proxy_ptrs,
const size_t  elem_count,
Executor executor 
)
private

Definition at line 344 of file TableFunctionExecutionContext.cpp.

References CHECK, DEBUG_TIMER, GenericError, NotAnError, row_set_mem_owner_, and to_string().

Referenced by execute().

351  {
352  auto timer = DEBUG_TIMER(__func__);
353  int64_t output_row_count = 0;
354 
355  // If TableFunctionManager must be a singleton but it has been
356  // initialized from another thread, TableFunctionManager constructor
357  // blocks via TableFunctionManager_singleton_mutex until the
358  // existing singleton is deconstructed.
359  auto mgr = std::make_unique<TableFunctionManager>(
360  exe_unit,
361  executor,
362  col_buf_ptrs,
364  /*is_singleton=*/!exe_unit.table_func.usesManager());
365 
366  // setup the inputs
367  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
368  const auto byte_stream_ptr = !col_buf_ptrs.empty()
369  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
370  : nullptr;
371  if (!col_buf_ptrs.empty()) {
372  CHECK(byte_stream_ptr);
373  }
374  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
375  if (!col_sizes.empty()) {
376  CHECK(col_sizes_ptr);
377  }
378  const auto input_str_dict_proxy_byte_stream_ptr =
379  !input_str_dict_proxy_ptrs.empty()
380  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
381  : nullptr;
382 
383  // execute
384  const auto err = compilation_context->table_function_entry_point()(
385  reinterpret_cast<const int8_t*>(mgr.get()),
386  byte_stream_ptr, // input columns buffer
387  col_sizes_ptr, // input column sizes
388  input_str_dict_proxy_byte_stream_ptr, // input string dictionary proxy ptrs
389  nullptr,
390  nullptr, // output string dictionary proxy ptrs - not supported for pre-flights yet
391  &output_row_count);
393  // table_function_entry_point does not initialize output_row_count
394  // when a UDTF returns NotAnError, so we'll set it here.
395  output_row_count = mgr->get_nrows();
396  }
397  if (exe_unit.table_func.hasPreFlightOutputSizer()) {
398  exe_unit.output_buffer_size_param = output_row_count;
399  }
400 
402  } else if (err == TableFunctionErrorCode::GenericError) {
403  throw UserTableFunctionError("Error executing table function pre flight check: " +
404  std::string(mgr->get_error_message()));
405  } else if (err) {
406  throw UserTableFunctionError("Error executing table function pre flight check: " +
407  std::to_string(err));
408  }
409 }
std::string to_string(char const *&&v)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TableFunctionExecutionContext& TableFunctionExecutionContext::operator= ( const TableFunctionExecutionContext )
delete

Member Data Documentation

std::shared_ptr<RowSetMemoryOwner> TableFunctionExecutionContext::row_set_mem_owner_
private

The documentation for this class was generated from the following files: