OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
23 #include "Shared/Logger.h"
24 
25 namespace {
26 
27 template <typename T>
28 const int8_t* create_literal_buffer(T literal,
29  const ExecutorDeviceType device_type,
30  std::vector<std::unique_ptr<char[]>>& literals_owner,
31  CudaAllocator* gpu_allocator) {
32  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
33  switch (device_type) {
35  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
36  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
37  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
38  }
40  CHECK(gpu_allocator);
41  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
42  gpu_allocator->copyToDevice(
43  gpu_literal_buf_ptr, reinterpret_cast<int8_t*>(&literal), sizeof(T));
44  return gpu_literal_buf_ptr;
45  }
46  }
47  UNREACHABLE();
48  return nullptr;
49 }
50 
52  size_t input_element_count) {
53  size_t allocated_output_row_count = 0;
54  if (*exe_unit.output_buffer_multiplier) {
55  allocated_output_row_count = *exe_unit.output_buffer_multiplier * input_element_count;
56  } else {
57  throw std::runtime_error(
58  "Only row multiplier output buffer configuration is supported for table "
59  "functions.");
60  }
61  return allocated_output_row_count;
62 }
63 
64 } // namespace
65 
67  const TableFunctionExecutionUnit& exe_unit,
68  const InputTableInfo& table_info,
69  const TableFunctionCompilationContext* compilation_context,
70  const ColumnFetcher& column_fetcher,
71  const ExecutorDeviceType device_type,
72  Executor* executor) {
73  CHECK(compilation_context);
74 
75  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
76  std::vector<std::unique_ptr<char[]>> literals_owner;
77 
78  const int device_id = 0; // TODO(adb): support multi-gpu table functions
79  std::unique_ptr<CudaAllocator> device_allocator;
80  if (device_type == ExecutorDeviceType::GPU) {
81  auto& data_mgr = executor->catalog_->getDataMgr();
82  device_allocator.reset(new CudaAllocator(&data_mgr, device_id));
83  }
84 
85  std::vector<const int8_t*> col_buf_ptrs;
86  ssize_t element_count = -1;
87  for (const auto& input_expr : exe_unit.input_exprs) {
88  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
89  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
90  executor,
91  *col_var,
92  table_info.info.fragments.front(),
95  device_id,
96  chunks_owner,
97  column_fetcher.columnarized_table_cache_);
98  if (element_count < 0) {
99  element_count = static_cast<ssize_t>(buf_elem_count);
100  } else {
101  CHECK_EQ(static_cast<ssize_t>(buf_elem_count), element_count);
102  }
103  col_buf_ptrs.push_back(col_buf);
104  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
105  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
106  // separate serialization component
107  const auto const_val_datum = constant_val->get_constval();
108  const auto& ti = constant_val->get_type_info();
109  if (ti.is_fp()) {
110  switch (get_bit_width(ti)) {
111  case 32:
112  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
113  device_type,
114  literals_owner,
115  device_allocator.get()));
116  break;
117  case 64:
118  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
119  device_type,
120  literals_owner,
121  device_allocator.get()));
122  break;
123  default:
124  UNREACHABLE();
125  }
126  } else if (ti.is_integer()) {
127  switch (get_bit_width(ti)) {
128  case 8:
129  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
130  device_type,
131  literals_owner,
132  device_allocator.get()));
133  break;
134  case 16:
135  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
136  device_type,
137  literals_owner,
138  device_allocator.get()));
139  break;
140  case 32:
141  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
142  device_type,
143  literals_owner,
144  device_allocator.get()));
145  break;
146  case 64:
147  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
148  device_type,
149  literals_owner,
150  device_allocator.get()));
151  break;
152  default:
153  UNREACHABLE();
154  }
155  } else {
156  throw std::runtime_error("Literal value " + constant_val->toString() +
157  " is not yet supported.");
158  }
159  }
160  }
161  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
162 
163  CHECK_GE(element_count, ssize_t(0));
164  switch (device_type) {
166  return launchCpuCode(exe_unit,
167  compilation_context,
168  col_buf_ptrs,
169  static_cast<size_t>(element_count),
170  executor);
172  return launchGpuCode(exe_unit,
173  compilation_context,
174  col_buf_ptrs,
175  static_cast<size_t>(element_count),
176  /*device_id=*/0,
177  executor);
178  }
179  UNREACHABLE();
180  return nullptr;
181 }
182 
184  const TableFunctionExecutionUnit& exe_unit,
185  const TableFunctionCompilationContext* compilation_context,
186  std::vector<const int8_t*>& col_buf_ptrs,
187  const size_t elem_count,
188  Executor* executor) {
189  // setup the inputs
190  const auto byte_stream_ptr = reinterpret_cast<const int8_t**>(col_buf_ptrs.data());
191  CHECK(byte_stream_ptr);
192 
193  // initialize output memory
195  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
196  query_mem_desc.setOutputColumnar(true);
197 
198  for (size_t i = 0; i < exe_unit.target_exprs.size(); i++) {
199  // All outputs padded to 8 bytes
200  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
201  }
202 
203  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
204  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
205  exe_unit,
207  /*device_id=*/0,
209  allocated_output_row_count,
210  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
211  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
213  nullptr,
214  executor);
215 
216  // setup the output
217  int64_t output_row_count = -1;
218  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
219  CHECK(group_by_buffers_ptr);
220 
221  // execute
222  const auto kernel_element_count = static_cast<int64_t>(elem_count);
223  const auto err =
224  compilation_context->getFuncPtr()(byte_stream_ptr,
225  &kernel_element_count,
226  query_buffers->getGroupByBuffersPtr(),
227  &output_row_count);
228  if (err) {
229  throw std::runtime_error("Error executing table function: " + std::to_string(err));
230  }
231  if (output_row_count < 0) {
232  throw std::runtime_error("Table function did not properly set output row count.");
233  }
234 
235  // Update entry count, it may differ from allocated mem size
236  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
237 
238  return query_buffers->getResultSetOwned(0);
239 }
240 
241 namespace {
242 enum {
249 };
250 }
251 
253  const TableFunctionExecutionUnit& exe_unit,
254  const TableFunctionCompilationContext* compilation_context,
255  std::vector<const int8_t*>& col_buf_ptrs,
256  const size_t elem_count,
257  const int device_id,
258  Executor* executor) {
259 #ifdef HAVE_CUDA
260  auto& data_mgr = executor->catalog_->getDataMgr();
261  auto gpu_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
262  CHECK(gpu_allocator);
263 
264  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
265  // setup the inputs
266  auto byte_stream_ptr = gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t));
267  gpu_allocator->copyToDevice(byte_stream_ptr,
268  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
269  col_buf_ptrs.size() * sizeof(int64_t));
270  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
271 
272  kernel_params[INPUT_ROW_COUNT] =
273  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(elem_count)));
274  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[INPUT_ROW_COUNT]),
275  reinterpret_cast<const int8_t*>(&elem_count),
276  sizeof(elem_count));
277 
278  kernel_params[ERROR_BUFFER] =
279  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
280 
281  // initialize output memory
283  executor, elem_count, QueryDescriptionType::Projection, /*is_table_function=*/true);
284  query_mem_desc.setOutputColumnar(true);
285 
286  for (size_t i = 0; i < exe_unit.target_exprs.size(); i++) {
287  // All outputs padded to 8 bytes
288  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
289  }
290  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
291  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
292  exe_unit,
294  device_id,
296  allocated_output_row_count,
297  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
298  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
300  gpu_allocator.get(),
301  executor);
302 
303  // setup the output
304  int64_t output_row_count = -1;
305  kernel_params[OUTPUT_ROW_COUNT] =
306  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
307  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
308  reinterpret_cast<int8_t*>(&output_row_count),
309  sizeof(output_row_count));
310 
311  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
312  CHECK(group_by_buffers_ptr);
313 
314  const unsigned block_size_x = executor->blockSize();
315  const unsigned block_size_y = 1;
316  const unsigned block_size_z = 1;
317  const unsigned grid_size_x = executor->gridSize();
318  const unsigned grid_size_y = 1;
319  const unsigned grid_size_z = 1;
320 
321  auto gpu_output_buffers = query_buffers->setupTableFunctionGpuBuffers(
322  query_mem_desc, device_id, block_size_x, grid_size_x);
323  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.first);
324 
325  // execute
326  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
327 
328  std::vector<void*> param_ptrs;
329  for (auto& param : kernel_params) {
330  param_ptrs.push_back(&param);
331  }
332 
333  // Get cu func
334  const auto gpu_code_ptr = compilation_context->getGpuCode();
335  CHECK(gpu_code_ptr);
336  CHECK_LT(static_cast<size_t>(device_id), gpu_code_ptr->native_functions.size());
337  const auto native_function_pointer = gpu_code_ptr->native_functions[device_id].first;
338  auto cu_func = static_cast<CUfunction>(native_function_pointer);
339  checkCudaErrors(cuLaunchKernel(cu_func,
340  grid_size_x,
341  grid_size_y,
342  grid_size_z,
343  block_size_x,
344  block_size_y,
345  block_size_z,
346  0, // shared mem bytes
347  nullptr,
348  &param_ptrs[0],
349  nullptr));
350  // TODO(adb): read errors
351 
352  // read output row count from GPU
353  int64_t new_output_row_count = -1;
354  gpu_allocator->copyFromDevice(
355  reinterpret_cast<int8_t*>(&new_output_row_count),
356  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
357  sizeof(int64_t));
358  if (new_output_row_count < 0) {
359  new_output_row_count = allocated_output_row_count;
360  }
361 
362  // Update entry count, it may differ from allocated mem size
363  query_buffers->getResultSet(0)->updateStorageEntryCount(new_output_row_count);
364 
365  // Copy back to CPU storage
366  query_buffers->copyGroupByBuffersFromGpu(&data_mgr,
367  query_mem_desc,
368  new_output_row_count,
369  gpu_output_buffers,
370  nullptr,
371  block_size_x,
372  grid_size_x,
373  device_id,
374  false);
375 
376  return query_buffers->getResultSetOwned(0);
377 #else
378  UNREACHABLE();
379  return nullptr;
380 #endif
381 }
Defines data structures for the semantic analysis phase of query processing.
#define CHECK_EQ(x, y)
Definition: Logger.h:198
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, const size_t elem_count, Executor *executor)
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
const std::optional< size_t > output_buffer_multiplier
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const TableFunctionCompilationContext *compilation_context, std::vector< const int8_t * > &col_buf_ptrs, const size_t elem_count, const int device_id, Executor *executor)
void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const override
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const InputTableInfo &table_info, const TableFunctionCompilationContext *compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor)
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:234
void setOutputColumnar(const bool val)
#define CHECK_GE(x, y)
Definition: Logger.h:203
ColumnCacheMap columnarized_table_cache_
Definition: ColumnFetcher.h:79
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
std::shared_ptr< ResultSet > ResultSetPtr
CodeGenerator::GPUCode * getGpuCode() const
std::string to_string(char const *&&v)
static int8_t * alloc(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
size_t get_bit_width(const SQLTypeInfo &ti)
CHECK(cgen_state)
void * CUfunction
Definition: nocuda.h:24
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
#define CHECK_LT(x, y)
Definition: Logger.h:200
TableFunctionCompilationContext::FuncPtr getFuncPtr() const
#define CHECK_LE(x, y)
Definition: Logger.h:201
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
const int8_t * create_literal_buffer(T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)