OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
26 #include "Shared/funcannotations.h"
27 
28 namespace {
29 
30 template <typename T>
31 const int8_t* create_literal_buffer(const T literal,
32  const ExecutorDeviceType device_type,
33  std::vector<std::unique_ptr<char[]>>& literals_owner,
34  CudaAllocator* gpu_allocator) {
35  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
36  switch (device_type) {
38  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
39  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
40  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
41  }
43  CHECK(gpu_allocator);
44  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
45  gpu_allocator->copyToDevice(
46  gpu_literal_buf_ptr, reinterpret_cast<const int8_t*>(&literal), sizeof(T));
47  return gpu_literal_buf_ptr;
48  }
49  }
50  UNREACHABLE();
51  return nullptr;
52 }
53 
54 // Specialization for std::string. Currently we simply hand the UDTF a char* to the
55 // first char of a c-style null-terminated string we copy out of the std::string.
56 // May want to evaluate moving to sending in the ptr and size
57 template <>
58 const int8_t* create_literal_buffer(std::string* const literal,
59  const ExecutorDeviceType device_type,
60  std::vector<std::unique_ptr<char[]>>& literals_owner,
61  CudaAllocator* gpu_allocator) {
62  const int64_t string_size = literal->size();
63  const int64_t padded_string_size =
64  (string_size + 7) / 8 * 8; // round up to the next multiple of 8
65  switch (device_type) {
67  literals_owner.emplace_back(
68  std::make_unique<char[]>(sizeof(int64_t) + padded_string_size));
69  std::memcpy(literals_owner.back().get(), &string_size, sizeof(int64_t));
70  std::memcpy(
71  literals_owner.back().get() + sizeof(int64_t), literal->data(), string_size);
72  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
73  }
75  CHECK(gpu_allocator);
76  const auto gpu_literal_buf_ptr =
77  gpu_allocator->alloc(sizeof(int64_t) + padded_string_size);
78  gpu_allocator->copyToDevice(gpu_literal_buf_ptr,
79  reinterpret_cast<const int8_t*>(&string_size),
80  sizeof(int64_t));
81  gpu_allocator->copyToDevice(gpu_literal_buf_ptr + sizeof(int64_t),
82  reinterpret_cast<const int8_t*>(literal->data()),
83  string_size);
84  return gpu_literal_buf_ptr;
85  }
86  }
87  UNREACHABLE();
88  return nullptr;
89 }
90 
92  size_t input_element_count) {
93  size_t allocated_output_row_count = 0;
94  switch (exe_unit.table_func.getOutputRowSizeType()) {
97  allocated_output_row_count = exe_unit.output_buffer_size_param;
98  break;
99  }
101  allocated_output_row_count =
102  exe_unit.output_buffer_size_param * input_element_count;
103  break;
104  }
106  allocated_output_row_count = input_element_count;
107  break;
108  }
109  default: {
110  UNREACHABLE();
111  }
112  }
113  return allocated_output_row_count;
114 }
115 
116 } // namespace
117 
119  const TableFunctionExecutionUnit& exe_unit,
120  const std::vector<InputTableInfo>& table_infos,
121  const TableFunctionCompilationContext* compilation_context,
122  const ColumnFetcher& column_fetcher,
123  const ExecutorDeviceType device_type,
124  Executor* executor) {
125  CHECK(compilation_context);
126  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
127  std::vector<std::unique_ptr<char[]>> literals_owner;
128 
129  const int device_id = 0; // TODO(adb): support multi-gpu table functions
130  std::unique_ptr<CudaAllocator> device_allocator;
131  if (device_type == ExecutorDeviceType::GPU) {
132  auto data_mgr = executor->getDataMgr();
133  device_allocator.reset(new CudaAllocator(data_mgr, device_id));
134  }
135  std::vector<const int8_t*> col_buf_ptrs;
136  std::vector<int64_t> col_sizes;
137  std::optional<size_t> input_num_rows;
138 
139  int col_index = -1;
140  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
141  // arguments are not supported on GPU atm.
142  std::vector<std::vector<const int8_t*>> col_list_bufs;
143  for (const auto& input_expr : exe_unit.input_exprs) {
144  auto ti = input_expr->get_type_info();
145  if (!ti.is_column_list()) {
146  CHECK_EQ(col_index, -1);
147  }
148  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
149  auto table_id = col_var->get_table_id();
150  auto table_info_it = std::find_if(
151  table_infos.begin(), table_infos.end(), [&table_id](const auto& table_info) {
152  return table_info.table_id == table_id;
153  });
154  CHECK(table_info_it != table_infos.end());
155  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
156  executor,
157  *col_var,
158  table_info_it->info.fragments.front(),
161  device_id,
162  device_allocator.get(),
163  /*thread_idx=*/0,
164  chunks_owner,
165  column_fetcher.columnarized_table_cache_);
166  // We use the number of entries in the first column to be the number of rows to base
167  // the output off of (optionally depending on the sizing parameter)
168  if (!input_num_rows) {
169  input_num_rows = (buf_elem_count ? buf_elem_count : 1);
170  }
171  if (ti.is_column_list()) {
172  if (col_index == -1) {
173  col_list_bufs.push_back({});
174  col_list_bufs.back().reserve(ti.get_dimension());
175  } else {
176  CHECK_EQ(col_sizes.back(), buf_elem_count);
177  }
178  col_index++;
179  col_list_bufs.back().push_back(col_buf);
180  // append col_buf to column_list col_buf
181  if (col_index + 1 == ti.get_dimension()) {
182  col_index = -1;
183  }
184  // columns in the same column_list point to column_list data
185  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
186  } else {
187  col_buf_ptrs.push_back(col_buf);
188  }
189  col_sizes.push_back(buf_elem_count);
190  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
191  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
192  // separate serialization component
193  col_sizes.push_back(0);
194  const auto const_val_datum = constant_val->get_constval();
195  const auto& ti = constant_val->get_type_info();
196  if (ti.is_fp()) {
197  switch (get_bit_width(ti)) {
198  case 32:
199  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
200  device_type,
201  literals_owner,
202  device_allocator.get()));
203  break;
204  case 64:
205  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
206  device_type,
207  literals_owner,
208  device_allocator.get()));
209  break;
210  default:
211  UNREACHABLE();
212  }
213  } else if (ti.is_integer()) {
214  switch (get_bit_width(ti)) {
215  case 8:
216  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
217  device_type,
218  literals_owner,
219  device_allocator.get()));
220  break;
221  case 16:
222  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
223  device_type,
224  literals_owner,
225  device_allocator.get()));
226  break;
227  case 32:
228  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
229  device_type,
230  literals_owner,
231  device_allocator.get()));
232  break;
233  case 64:
234  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
235  device_type,
236  literals_owner,
237  device_allocator.get()));
238  break;
239  default:
240  UNREACHABLE();
241  }
242  } else if (ti.is_boolean()) {
243  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.boolval,
244  device_type,
245  literals_owner,
246  device_allocator.get()));
247  } else if (ti.is_bytes()) { // text encoding none string
248  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.stringval,
249  device_type,
250  literals_owner,
251  device_allocator.get()));
252  } else {
253  throw std::runtime_error("Literal value " + constant_val->toString() +
254  " is not yet supported.");
255  }
256  }
257  }
258  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
259  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
260  if (!exe_unit.table_func
261  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
262  // user-specified constants,
263  // and runtime table funtion
264  // specified sizing, only
265  // user-specified row-multipliers
266  // currently take into account input
267  // row size
268  CHECK(input_num_rows);
269  }
270  switch (device_type) {
272  return launchCpuCode(exe_unit,
273  compilation_context,
274  col_buf_ptrs,
275  col_sizes,
276  *input_num_rows,
277  executor);
279  return launchGpuCode(exe_unit,
280  compilation_context,
281  col_buf_ptrs,
282  col_sizes,
283  *input_num_rows,
284  /*device_id=*/0,
285  executor);
286  }
287  UNREACHABLE();
288  return nullptr;
289 }
290 
292  const TableFunctionExecutionUnit& exe_unit,
293  const TableFunctionCompilationContext* compilation_context,
294  std::vector<const int8_t*>& col_buf_ptrs,
295  std::vector<int64_t>& col_sizes,
296  const size_t elem_count, // taken from first source only currently
297  Executor* executor) {
298  int64_t output_row_count = 0;
299 
300  // mgr will allocate output buffers on output column resize
301  auto mgr = std::make_unique<QueryOutputBufferMemoryManager>(
302  exe_unit, executor, col_buf_ptrs, row_set_mem_owner_);
303 
304  // except_mgr handles memory for error/exception messages
305  auto except_mgr = std::make_unique<TableFunctionExceptionManager>();
306 
307  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
308  // allocate output buffers because the size is known up front, from
309  // user specified parameters (and table size in the case of a user
310  // specified row multiplier)
311  output_row_count = get_output_row_count(exe_unit, elem_count);
312  }
313 
314  // setup the inputs
315  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
316  const auto byte_stream_ptr = !col_buf_ptrs.empty()
317  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
318  : nullptr;
319  if (!col_buf_ptrs.empty()) {
320  CHECK(byte_stream_ptr);
321  }
322  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
323  if (!col_sizes.empty()) {
324  CHECK(col_sizes_ptr);
325  }
326 
327  // execute
328  auto timer = DEBUG_TIMER(__func__);
329  const auto err =
330  compilation_context->getFuncPtr()(byte_stream_ptr, // input columns buffer
331  col_sizes_ptr, // input column sizes
332  nullptr,
333  &output_row_count);
334 
336  throw std::runtime_error("Error executing table function: " +
337  std::string(except_mgr->get_error_message()));
338  }
339 
340  else if (err) {
341  throw std::runtime_error("Error executing table function: " + std::to_string(err));
342  }
343 
344  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
345  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
346  throw std::runtime_error(
347  "Table function with constant sizing parameter must return " +
348  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
349  ")");
350  }
351  } else {
352  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
353  output_row_count = mgr->get_nrows();
354  }
355  }
356  // Update entry count, it may differ from allocated mem size
357  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
358  // set_output_row_size has not been called
359  if (output_row_count == 0) {
360  // allocate for empty output columns
361  mgr->allocate_output_buffers(0);
362  } else {
363  throw std::runtime_error("Table function must call set_output_row_size");
364  }
365  }
366 
367  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
368 
369  const size_t column_size = output_row_count * sizeof(int64_t);
370  const size_t allocated_column_size = mgr->get_nrows() * sizeof(int64_t);
371  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
372  CHECK(group_by_buffers_ptr);
373  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
374 
375  auto num_out_columns = exe_unit.target_exprs.size();
376  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
377  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
378  for (size_t i = 0; i < num_out_columns; i++) {
379  if (src != dst) {
380  auto t = memmove(dst, src, column_size);
381  CHECK_EQ(dst, t);
382  }
383  src += allocated_column_size;
384  dst += column_size;
385  }
386 
387  return mgr->query_buffers->getResultSetOwned(0);
388 }
389 
390 namespace {
391 enum {
398 };
399 }
400 
402  const TableFunctionExecutionUnit& exe_unit,
403  const TableFunctionCompilationContext* compilation_context,
404  std::vector<const int8_t*>& col_buf_ptrs,
405  std::vector<int64_t>& col_sizes,
406  const size_t elem_count,
407  const int device_id,
408  Executor* executor) {
409 #ifdef HAVE_CUDA
411  throw QueryMustRunOnCpu();
412  }
413 
414  auto num_out_columns = exe_unit.target_exprs.size();
415  auto data_mgr = executor->getDataMgr();
416  auto gpu_allocator = std::make_unique<CudaAllocator>(data_mgr, device_id);
417  CHECK(gpu_allocator);
418  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
419 
420  // setup the inputs
421  auto byte_stream_ptr = !(col_buf_ptrs.empty())
422  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
423  : nullptr;
424  if (byte_stream_ptr) {
425  gpu_allocator->copyToDevice(byte_stream_ptr,
426  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
427  col_buf_ptrs.size() * sizeof(int64_t));
428  }
429  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
430 
431  auto col_sizes_ptr = !(col_sizes.empty())
432  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
433  : nullptr;
434  if (col_sizes_ptr) {
435  gpu_allocator->copyToDevice(col_sizes_ptr,
436  reinterpret_cast<int8_t*>(col_sizes.data()),
437  col_sizes.size() * sizeof(int64_t));
438  }
439  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
440 
441  kernel_params[ERROR_BUFFER] =
442  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
443  // initialize output memory
445  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
446  query_mem_desc.setOutputColumnar(true);
447 
448  for (size_t i = 0; i < num_out_columns; i++) {
449  // All outputs padded to 8 bytes
450  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
451  }
452  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
453  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
454  exe_unit,
456  device_id,
458  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
459  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
460  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
462  gpu_allocator.get(),
463  executor);
464 
465  // setup the output
466  int64_t output_row_count = allocated_output_row_count;
467 
468  kernel_params[OUTPUT_ROW_COUNT] =
469  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
470  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
471  reinterpret_cast<int8_t*>(&output_row_count),
472  sizeof(output_row_count));
473 
474  // const unsigned block_size_x = executor->blockSize();
475  const unsigned block_size_x = 1;
476  const unsigned block_size_y = 1;
477  const unsigned block_size_z = 1;
478  // const unsigned grid_size_x = executor->gridSize();
479  const unsigned grid_size_x = 1;
480  const unsigned grid_size_y = 1;
481  const unsigned grid_size_z = 1;
482 
483  auto gpu_output_buffers = query_buffers->setupTableFunctionGpuBuffers(
484  query_mem_desc, device_id, block_size_x, grid_size_x);
485 
486  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
487 
488  // execute
489  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
490 
491  std::vector<void*> param_ptrs;
492  for (auto& param : kernel_params) {
493  param_ptrs.push_back(&param);
494  }
495 
496  // Get cu func
497  const auto gpu_context = compilation_context->getGpuCode();
498  CHECK(gpu_context);
499  const auto native_code = gpu_context->getNativeCode(device_id);
500  auto cu_func = static_cast<CUfunction>(native_code.first);
501  checkCudaErrors(cuLaunchKernel(cu_func,
502  grid_size_x,
503  grid_size_y,
504  grid_size_z,
505  block_size_x,
506  block_size_y,
507  block_size_z,
508  0, // shared mem bytes
509  nullptr,
510  &param_ptrs[0],
511  nullptr));
512  // TODO(adb): read errors
513 
514  // read output row count from GPU
515  gpu_allocator->copyFromDevice(
516  reinterpret_cast<int8_t*>(&output_row_count),
517  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
518  sizeof(int64_t));
519  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
520  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
521  throw std::runtime_error(
522  "Table function with constant sizing parameter must return " +
523  std::to_string(allocated_output_row_count) + " (got " +
524  std::to_string(output_row_count) + ")");
525  }
526  } else {
527  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
528  output_row_count = allocated_output_row_count;
529  }
530  }
531 
532  // Update entry count, it may differ from allocated mem size
533  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
534 
535  // Copy back to CPU storage
536  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
537  query_mem_desc,
538  output_row_count,
539  gpu_output_buffers,
540  device_id,
541  block_size_x,
542  grid_size_x);
543 
544  return query_buffers->getResultSetOwned(0);
545 #else
546  UNREACHABLE();
547  return nullptr;
548 #endif
549 }
Defines data structures for the semantic analysis phase of query processing.
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, const int device_id, Executor *executor)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
GpuCompilationContext * getGpuCode() const
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const override
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:253
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
std::string to_string(char const *&&v)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
size_t get_bit_width(const SQLTypeInfo &ti)
void * CUfunction
Definition: nocuda.h:24
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
TableFunctionCompilationContext::FuncPtr getFuncPtr() const
int8_t * alloc(const size_t num_bytes) override
#define CHECK_LE(x, y)
Definition: Logger.h:220
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
const int8_t * create_literal_buffer(const T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
char * t
std::vector< Analyzer::Expr * > target_exprs
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
OutputBufferSizeType getOutputRowSizeType() const