OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
24 
25 namespace {
26 
27 template <typename T>
28 const int8_t* create_literal_buffer(T literal,
29  const ExecutorDeviceType device_type,
30  std::vector<std::unique_ptr<char[]>>& literals_owner,
31  CudaAllocator* gpu_allocator) {
32  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
33  switch (device_type) {
35  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
36  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
37  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
38  }
40  CHECK(gpu_allocator);
41  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
42  gpu_allocator->copyToDevice(
43  gpu_literal_buf_ptr, reinterpret_cast<int8_t*>(&literal), sizeof(T));
44  return gpu_literal_buf_ptr;
45  }
46  }
47  UNREACHABLE();
48  return nullptr;
49 }
50 
52  size_t input_element_count) {
53  size_t allocated_output_row_count = 0;
54  switch (exe_unit.table_func.getOutputRowSizeType()) {
57  allocated_output_row_count = exe_unit.output_buffer_size_param;
58  break;
59  }
61  allocated_output_row_count =
62  exe_unit.output_buffer_size_param * input_element_count;
63  break;
64  }
65  default: {
66  UNREACHABLE();
67  }
68  }
69  return allocated_output_row_count;
70 }
71 
72 } // namespace
73 
75  const TableFunctionExecutionUnit& exe_unit,
76  const std::vector<InputTableInfo>& table_infos,
77  const TableFunctionCompilationContext* compilation_context,
78  const ColumnFetcher& column_fetcher,
79  const ExecutorDeviceType device_type,
80  Executor* executor) {
81  CHECK(compilation_context);
82  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
83  std::vector<std::unique_ptr<char[]>> literals_owner;
84 
85  const int device_id = 0; // TODO(adb): support multi-gpu table functions
86  std::unique_ptr<CudaAllocator> device_allocator;
87  if (device_type == ExecutorDeviceType::GPU) {
88  auto& data_mgr = executor->catalog_->getDataMgr();
89  device_allocator.reset(new CudaAllocator(&data_mgr, device_id));
90  }
91  std::vector<const int8_t*> col_buf_ptrs;
92  std::vector<int64_t> col_sizes;
93  std::optional<size_t> output_column_size;
94 
95  int col_index = -1;
96  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
97  // arguments are not supported on GPU atm.
98  std::vector<std::vector<const int8_t*>> col_list_bufs;
99  for (const auto& input_expr : exe_unit.input_exprs) {
100  auto ti = input_expr->get_type_info();
101  if (!ti.is_column_list()) {
102  CHECK_EQ(col_index, -1);
103  }
104  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
105  auto table_id = col_var->get_table_id();
106  auto table_info_it = std::find_if(
107  table_infos.begin(), table_infos.end(), [&table_id](const auto& table_info) {
108  return table_info.table_id == table_id;
109  });
110  CHECK(table_info_it != table_infos.end());
111  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
112  executor,
113  *col_var,
114  table_info_it->info.fragments.front(),
117  device_id,
118  device_allocator.get(),
119  /*thread_idx=*/0,
120  chunks_owner,
121  column_fetcher.columnarized_table_cache_);
122  // We use the size of the first column to be the size of the output column
123  if (!output_column_size) {
124  output_column_size = (buf_elem_count ? buf_elem_count : 1);
125  }
126  if (ti.is_column_list()) {
127  if (col_index == -1) {
128  col_list_bufs.push_back({});
129  col_list_bufs.back().reserve(ti.get_dimension());
130  } else {
131  CHECK_EQ(col_sizes.back(), buf_elem_count);
132  }
133  col_index++;
134  col_list_bufs.back().push_back(col_buf);
135  // append col_buf to column_list col_buf
136  if (col_index + 1 == ti.get_dimension()) {
137  col_index = -1;
138  }
139  // columns in the same column_list point to column_list data
140  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
141  } else {
142  col_buf_ptrs.push_back(col_buf);
143  }
144  col_sizes.push_back(buf_elem_count);
145  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
146  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
147  // separate serialization component
148  col_sizes.push_back(0);
149  const auto const_val_datum = constant_val->get_constval();
150  const auto& ti = constant_val->get_type_info();
151  if (ti.is_fp()) {
152  switch (get_bit_width(ti)) {
153  case 32:
154  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
155  device_type,
156  literals_owner,
157  device_allocator.get()));
158  break;
159  case 64:
160  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
161  device_type,
162  literals_owner,
163  device_allocator.get()));
164  break;
165  default:
166  UNREACHABLE();
167  }
168  } else if (ti.is_integer()) {
169  switch (get_bit_width(ti)) {
170  case 8:
171  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
172  device_type,
173  literals_owner,
174  device_allocator.get()));
175  break;
176  case 16:
177  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
178  device_type,
179  literals_owner,
180  device_allocator.get()));
181  break;
182  case 32:
183  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
184  device_type,
185  literals_owner,
186  device_allocator.get()));
187  break;
188  case 64:
189  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
190  device_type,
191  literals_owner,
192  device_allocator.get()));
193  break;
194  default:
195  UNREACHABLE();
196  }
197  } else {
198  throw std::runtime_error("Literal value " + constant_val->toString() +
199  " is not yet supported.");
200  }
201  }
202  }
203  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
204  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
205  CHECK(output_column_size);
206  switch (device_type) {
208  return launchCpuCode(exe_unit,
209  compilation_context,
210  col_buf_ptrs,
211  col_sizes,
212  *output_column_size,
213  executor);
215  return launchGpuCode(exe_unit,
216  compilation_context,
217  col_buf_ptrs,
218  col_sizes,
219  *output_column_size,
220  /*device_id=*/0,
221  executor);
222  }
223  UNREACHABLE();
224  return nullptr;
225 }
226 
228  const TableFunctionExecutionUnit& exe_unit,
229  const TableFunctionCompilationContext* compilation_context,
230  std::vector<const int8_t*>& col_buf_ptrs,
231  std::vector<int64_t>& col_sizes,
232  const size_t elem_count,
233  Executor* executor) {
234  // setup the inputs
235  const auto byte_stream_ptr = reinterpret_cast<const int8_t**>(col_buf_ptrs.data());
236  CHECK(byte_stream_ptr);
237 
238  // initialize output memory
239  auto num_out_columns = exe_unit.target_exprs.size();
241  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
242  query_mem_desc.setOutputColumnar(true);
243 
244  for (size_t i = 0; i < num_out_columns; i++) {
245  // All outputs padded to 8 bytes
246  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
247  }
248 
249  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
250  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
251  exe_unit,
253  /*device_id=*/0,
255  allocated_output_row_count,
256  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
257  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
259  nullptr,
260  executor);
261 
262  // setup the output
263  int64_t output_row_count = allocated_output_row_count;
264  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
265  CHECK(group_by_buffers_ptr);
266 
267  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
268  std::vector<int64_t*> output_col_buf_ptrs;
269  for (size_t i = 0; i < num_out_columns; i++) {
270  output_col_buf_ptrs.emplace_back(output_buffers_ptr + i * allocated_output_row_count);
271  }
272 
273  // execute
274  auto timer = DEBUG_TIMER(__func__);
275  const auto err = compilation_context->getFuncPtr()(
276  byte_stream_ptr, col_sizes.data(), output_col_buf_ptrs.data(), &output_row_count);
277  if (err) {
278  throw std::runtime_error("Error executing table function: " + std::to_string(err));
279  }
280  if (exe_unit.table_func.hasNonUserSpecifiedOutputSizeConstant()) {
281  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
282  throw std::runtime_error(
283  "Table function with constant sizing parameter must return " +
284  std::to_string(allocated_output_row_count) + " (got " +
285  std::to_string(output_row_count) + ")");
286  }
287  } else {
288  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
289  output_row_count = allocated_output_row_count;
290  }
291  }
292  // Update entry count, it may differ from allocated mem size
293  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
294 
295  const size_t column_size = output_row_count * sizeof(int64_t);
296  const size_t allocated_column_size = allocated_output_row_count * sizeof(int64_t);
297 
298  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
299  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
300  for (size_t i = 0; i < num_out_columns; i++) {
301  if (src != dst) {
302  auto t = memmove(dst, src, column_size);
303  CHECK_EQ(dst, t);
304  }
305  src += allocated_column_size;
306  dst += column_size;
307  }
308 
309  return query_buffers->getResultSetOwned(0);
310 }
311 
312 namespace {
313 enum {
320 };
321 }
322 
324  const TableFunctionExecutionUnit& exe_unit,
325  const TableFunctionCompilationContext* compilation_context,
326  std::vector<const int8_t*>& col_buf_ptrs,
327  std::vector<int64_t>& col_sizes,
328  const size_t elem_count,
329  const int device_id,
330  Executor* executor) {
331 #ifdef HAVE_CUDA
332  auto num_out_columns = exe_unit.target_exprs.size();
333  auto& data_mgr = executor->catalog_->getDataMgr();
334  auto gpu_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
335  CHECK(gpu_allocator);
336  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
337  // setup the inputs
338  auto byte_stream_ptr = gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t));
339  gpu_allocator->copyToDevice(byte_stream_ptr,
340  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
341  col_buf_ptrs.size() * sizeof(int64_t));
342  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
343 
344  auto col_sizes_ptr = gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t));
345  gpu_allocator->copyToDevice(col_sizes_ptr,
346  reinterpret_cast<int8_t*>(col_sizes.data()),
347  col_sizes.size() * sizeof(int64_t));
348  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
349 
350  kernel_params[ERROR_BUFFER] =
351  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
352  // initialize output memory
354  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
355  query_mem_desc.setOutputColumnar(true);
356 
357  for (size_t i = 0; i < num_out_columns; i++) {
358  // All outputs padded to 8 bytes
359  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
360  }
361  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
362  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
363  exe_unit,
365  device_id,
367  allocated_output_row_count,
368  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
369  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
371  gpu_allocator.get(),
372  executor);
373 
374  // setup the output
375  int64_t output_row_count = allocated_output_row_count;
376 
377  kernel_params[OUTPUT_ROW_COUNT] =
378  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
379  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
380  reinterpret_cast<int8_t*>(&output_row_count),
381  sizeof(output_row_count));
382 
383  // const unsigned block_size_x = executor->blockSize();
384  const unsigned block_size_x = 1;
385  const unsigned block_size_y = 1;
386  const unsigned block_size_z = 1;
387  // const unsigned grid_size_x = executor->gridSize();
388  const unsigned grid_size_x = 1;
389  const unsigned grid_size_y = 1;
390  const unsigned grid_size_z = 1;
391 
392  auto gpu_output_buffers = query_buffers->setupTableFunctionGpuBuffers(
393  query_mem_desc, device_id, block_size_x, grid_size_x);
394 
395  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.first);
396 
397  // execute
398  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
399 
400  std::vector<void*> param_ptrs;
401  for (auto& param : kernel_params) {
402  param_ptrs.push_back(&param);
403  }
404 
405  // Get cu func
406  const auto gpu_context = compilation_context->getGpuCode();
407  CHECK(gpu_context);
408  const auto native_code = gpu_context->getNativeCode(device_id);
409  auto cu_func = static_cast<CUfunction>(native_code.first);
410  checkCudaErrors(cuLaunchKernel(cu_func,
411  grid_size_x,
412  grid_size_y,
413  grid_size_z,
414  block_size_x,
415  block_size_y,
416  block_size_z,
417  0, // shared mem bytes
418  nullptr,
419  &param_ptrs[0],
420  nullptr));
421  // TODO(adb): read errors
422 
423  // read output row count from GPU
424  gpu_allocator->copyFromDevice(
425  reinterpret_cast<int8_t*>(&output_row_count),
426  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
427  sizeof(int64_t));
428  if (exe_unit.table_func.hasNonUserSpecifiedOutputSizeConstant()) {
429  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
430  throw std::runtime_error(
431  "Table function with constant sizing parameter must return " +
432  std::to_string(allocated_output_row_count) + " (got " +
433  std::to_string(output_row_count) + ")");
434  }
435  } else {
436  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
437  output_row_count = allocated_output_row_count;
438  }
439  }
440 
441  // Update entry count, it may differ from allocated mem size
442  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
443 
444  // Copy back to CPU storage
445  query_buffers->copyFromTableFunctionGpuBuffers(&data_mgr,
446  query_mem_desc,
447  output_row_count,
448  gpu_output_buffers,
449  device_id,
450  block_size_x,
451  grid_size_x);
452 
453  return query_buffers->getResultSetOwned(0);
454 #else
455  UNREACHABLE();
456  return nullptr;
457 #endif
458 }
Defines data structures for the semantic analysis phase of query processing.
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, const int device_id, Executor *executor)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
GpuCompilationContext * getGpuCode() const
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const override
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
std::string to_string(char const *&&v)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
size_t get_bit_width(const SQLTypeInfo &ti)
void * CUfunction
Definition: nocuda.h:24
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
TableFunctionCompilationContext::FuncPtr getFuncPtr() const
int8_t * alloc(const size_t num_bytes) override
#define CHECK_LE(x, y)
Definition: Logger.h:208
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
char * t
std::vector< Analyzer::Expr * > target_exprs
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
OutputBufferSizeType getOutputRowSizeType() const
const int8_t * create_literal_buffer(T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)