OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
24 
25 namespace {
26 
27 template <typename T>
28 const int8_t* create_literal_buffer(T literal,
29  const ExecutorDeviceType device_type,
30  std::vector<std::unique_ptr<char[]>>& literals_owner,
31  CudaAllocator* gpu_allocator) {
32  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
33  switch (device_type) {
35  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
36  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
37  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
38  }
40  CHECK(gpu_allocator);
41  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
42  gpu_allocator->copyToDevice(
43  gpu_literal_buf_ptr, reinterpret_cast<int8_t*>(&literal), sizeof(T));
44  return gpu_literal_buf_ptr;
45  }
46  }
47  UNREACHABLE();
48  return nullptr;
49 }
50 
52  size_t input_element_count) {
53  size_t allocated_output_row_count = 0;
54  switch (exe_unit.table_func.getOutputRowSizeType()) {
57  allocated_output_row_count = exe_unit.output_buffer_size_param;
58  break;
59  }
61  allocated_output_row_count =
62  exe_unit.output_buffer_size_param * input_element_count;
63  break;
64  }
65  default: {
66  UNREACHABLE();
67  }
68  }
69  return allocated_output_row_count;
70 }
71 
72 } // namespace
73 
75  const TableFunctionExecutionUnit& exe_unit,
76  const std::vector<InputTableInfo>& table_infos,
77  const TableFunctionCompilationContext* compilation_context,
78  const ColumnFetcher& column_fetcher,
79  const ExecutorDeviceType device_type,
80  Executor* executor) {
81  CHECK(compilation_context);
82  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
83  std::vector<std::unique_ptr<char[]>> literals_owner;
84 
85  const int device_id = 0; // TODO(adb): support multi-gpu table functions
86  std::unique_ptr<CudaAllocator> device_allocator;
87  if (device_type == ExecutorDeviceType::GPU) {
88  auto& data_mgr = executor->catalog_->getDataMgr();
89  device_allocator.reset(new CudaAllocator(&data_mgr, device_id));
90  }
91  std::vector<const int8_t*> col_buf_ptrs;
92  std::vector<int64_t> col_sizes;
93  std::optional<size_t> output_column_size;
94  for (const auto& input_expr : exe_unit.input_exprs) {
95  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
96  auto table_id = col_var->get_table_id();
97  auto table_info_it = std::find_if(
98  table_infos.begin(), table_infos.end(), [&table_id](const auto& table_info) {
99  return table_info.table_id == table_id;
100  });
101  CHECK(table_info_it != table_infos.end());
102  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
103  executor,
104  *col_var,
105  table_info_it->info.fragments.front(),
108  device_id,
109  device_allocator.get(),
110  chunks_owner,
111  column_fetcher.columnarized_table_cache_);
112 
113  // We use the size of the first column to be the size of the output column
114  if (!output_column_size) {
115  output_column_size = buf_elem_count;
116  }
117 
118  col_sizes.push_back(buf_elem_count);
119  col_buf_ptrs.push_back(col_buf);
120  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
121  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
122  // separate serialization component
123  col_sizes.push_back(0);
124  const auto const_val_datum = constant_val->get_constval();
125  const auto& ti = constant_val->get_type_info();
126  if (ti.is_fp()) {
127  switch (get_bit_width(ti)) {
128  case 32:
129  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
130  device_type,
131  literals_owner,
132  device_allocator.get()));
133  break;
134  case 64:
135  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
136  device_type,
137  literals_owner,
138  device_allocator.get()));
139  break;
140  default:
141  UNREACHABLE();
142  }
143  } else if (ti.is_integer()) {
144  switch (get_bit_width(ti)) {
145  case 8:
146  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
147  device_type,
148  literals_owner,
149  device_allocator.get()));
150  break;
151  case 16:
152  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
153  device_type,
154  literals_owner,
155  device_allocator.get()));
156  break;
157  case 32:
158  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
159  device_type,
160  literals_owner,
161  device_allocator.get()));
162  break;
163  case 64:
164  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
165  device_type,
166  literals_owner,
167  device_allocator.get()));
168  break;
169  default:
170  UNREACHABLE();
171  }
172  } else {
173  throw std::runtime_error("Literal value " + constant_val->toString() +
174  " is not yet supported.");
175  }
176  }
177  }
178  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
179  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
180  CHECK(output_column_size);
181 
182  switch (device_type) {
184  return launchCpuCode(exe_unit,
185  compilation_context,
186  col_buf_ptrs,
187  col_sizes,
188  *output_column_size,
189  executor);
191  return launchGpuCode(exe_unit,
192  compilation_context,
193  col_buf_ptrs,
194  col_sizes,
195  *output_column_size,
196  /*device_id=*/0,
197  executor);
198  }
199  UNREACHABLE();
200  return nullptr;
201 }
202 
204  const TableFunctionExecutionUnit& exe_unit,
205  const TableFunctionCompilationContext* compilation_context,
206  std::vector<const int8_t*>& col_buf_ptrs,
207  std::vector<int64_t>& col_sizes,
208  const size_t elem_count,
209  Executor* executor) {
210  // setup the inputs
211  const auto byte_stream_ptr = reinterpret_cast<const int8_t**>(col_buf_ptrs.data());
212  CHECK(byte_stream_ptr);
213 
214  // initialize output memory
215  auto num_out_columns = exe_unit.target_exprs.size();
217  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
218  query_mem_desc.setOutputColumnar(true);
219 
220  for (size_t i = 0; i < num_out_columns; i++) {
221  // All outputs padded to 8 bytes
222  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
223  }
224 
225  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
226  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
227  exe_unit,
229  /*device_id=*/0,
231  allocated_output_row_count,
232  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
233  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
235  nullptr,
236  executor);
237 
238  // setup the output
239  int64_t output_row_count = allocated_output_row_count;
240  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
241  CHECK(group_by_buffers_ptr);
242 
243  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
244  std::vector<int64_t*> output_col_buf_ptrs;
245  for (size_t i = 0; i < num_out_columns; i++) {
246  output_col_buf_ptrs.emplace_back(output_buffers_ptr + i * allocated_output_row_count);
247  }
248 
249  // execute
250  const auto err = compilation_context->getFuncPtr()(
251  byte_stream_ptr, col_sizes.data(), output_col_buf_ptrs.data(), &output_row_count);
252  if (err) {
253  throw std::runtime_error("Error executing table function: " + std::to_string(err));
254  }
255  if (exe_unit.table_func.hasNonUserSpecifiedOutputSizeConstant()) {
256  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
257  throw std::runtime_error(
258  "Table function with constant sizing parameter must return " +
259  std::to_string(allocated_output_row_count) + " (got " +
260  std::to_string(output_row_count) + ")");
261  }
262  } else {
263  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
264  output_row_count = allocated_output_row_count;
265  }
266  }
267  // Update entry count, it may differ from allocated mem size
268  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
269 
270  const size_t column_size = output_row_count * sizeof(int64_t);
271  const size_t allocated_column_size = allocated_output_row_count * sizeof(int64_t);
272 
273  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
274  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
275  for (size_t i = 0; i < num_out_columns; i++) {
276  if (src != dst) {
277  auto t = memmove(dst, src, column_size);
278  CHECK_EQ(dst, t);
279  }
280  src += allocated_column_size;
281  dst += column_size;
282  }
283 
284  return query_buffers->getResultSetOwned(0);
285 }
286 
287 namespace {
288 enum {
295 };
296 }
297 
299  const TableFunctionExecutionUnit& exe_unit,
300  const TableFunctionCompilationContext* compilation_context,
301  std::vector<const int8_t*>& col_buf_ptrs,
302  std::vector<int64_t>& col_sizes,
303  const size_t elem_count,
304  const int device_id,
305  Executor* executor) {
306 #ifdef HAVE_CUDA
307  auto num_out_columns = exe_unit.target_exprs.size();
308  auto& data_mgr = executor->catalog_->getDataMgr();
309  auto gpu_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
310  CHECK(gpu_allocator);
311  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
312  // setup the inputs
313  auto byte_stream_ptr = gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t));
314  gpu_allocator->copyToDevice(byte_stream_ptr,
315  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
316  col_buf_ptrs.size() * sizeof(int64_t));
317  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
318 
319  auto col_sizes_ptr = gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t));
320  gpu_allocator->copyToDevice(col_sizes_ptr,
321  reinterpret_cast<int8_t*>(col_sizes.data()),
322  col_sizes.size() * sizeof(int64_t));
323  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
324 
325  kernel_params[ERROR_BUFFER] =
326  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
327  // initialize output memory
329  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
330  query_mem_desc.setOutputColumnar(true);
331 
332  for (size_t i = 0; i < num_out_columns; i++) {
333  // All outputs padded to 8 bytes
334  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
335  }
336  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
337  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
338  exe_unit,
340  device_id,
342  allocated_output_row_count,
343  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
344  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
346  gpu_allocator.get(),
347  executor);
348 
349  // setup the output
350  int64_t output_row_count = allocated_output_row_count;
351 
352  kernel_params[OUTPUT_ROW_COUNT] =
353  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
354  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
355  reinterpret_cast<int8_t*>(&output_row_count),
356  sizeof(output_row_count));
357 
358  // const unsigned block_size_x = executor->blockSize();
359  const unsigned block_size_x = 1;
360  const unsigned block_size_y = 1;
361  const unsigned block_size_z = 1;
362  // const unsigned grid_size_x = executor->gridSize();
363  const unsigned grid_size_x = 1;
364  const unsigned grid_size_y = 1;
365  const unsigned grid_size_z = 1;
366 
367  auto gpu_output_buffers = query_buffers->setupTableFunctionGpuBuffers(
368  query_mem_desc, device_id, block_size_x, grid_size_x);
369 
370  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.first);
371 
372  // execute
373  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
374 
375  std::vector<void*> param_ptrs;
376  for (auto& param : kernel_params) {
377  param_ptrs.push_back(&param);
378  }
379 
380  // Get cu func
381  const auto gpu_context = compilation_context->getGpuCode();
382  CHECK(gpu_context);
383  const auto native_code = gpu_context->getNativeCode(device_id);
384  auto cu_func = static_cast<CUfunction>(native_code.first);
385  checkCudaErrors(cuLaunchKernel(cu_func,
386  grid_size_x,
387  grid_size_y,
388  grid_size_z,
389  block_size_x,
390  block_size_y,
391  block_size_z,
392  0, // shared mem bytes
393  nullptr,
394  &param_ptrs[0],
395  nullptr));
396  // TODO(adb): read errors
397 
398  // read output row count from GPU
399  gpu_allocator->copyFromDevice(
400  reinterpret_cast<int8_t*>(&output_row_count),
401  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
402  sizeof(int64_t));
403  if (exe_unit.table_func.hasNonUserSpecifiedOutputSizeConstant()) {
404  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
405  throw std::runtime_error(
406  "Table function with constant sizing parameter must return " +
407  std::to_string(allocated_output_row_count) + " (got " +
408  std::to_string(output_row_count) + ")");
409  }
410  } else {
411  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
412  output_row_count = allocated_output_row_count;
413  }
414  }
415 
416  // Update entry count, it may differ from allocated mem size
417  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
418 
419  // Copy back to CPU storage
420  query_buffers->copyFromTableFunctionGpuBuffers(&data_mgr,
421  query_mem_desc,
422  output_row_count,
423  gpu_output_buffers,
424  device_id,
425  block_size_x,
426  grid_size_x);
427 
428  return query_buffers->getResultSetOwned(0);
429 #else
430  UNREACHABLE();
431  return nullptr;
432 #endif
433 }
Defines data structures for the semantic analysis phase of query processing.
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, const int device_id, Executor *executor)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
GpuCompilationContext * getGpuCode() const
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const override
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
std::string to_string(char const *&&v)
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
size_t get_bit_width(const SQLTypeInfo &ti)
void * CUfunction
Definition: nocuda.h:24
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
TableFunctionCompilationContext::FuncPtr getFuncPtr() const
int8_t * alloc(const size_t num_bytes) override
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
std::vector< Analyzer::Expr * > target_exprs
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
OutputBufferSizeType getOutputRowSizeType() const
const int8_t * create_literal_buffer(T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)