OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
27 #include "Shared/funcannotations.h"
28 
29 namespace {
30 
31 template <typename T>
32 const int8_t* create_literal_buffer(const T literal,
33  const ExecutorDeviceType device_type,
34  std::vector<std::unique_ptr<char[]>>& literals_owner,
35  CudaAllocator* gpu_allocator) {
36  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
37  switch (device_type) {
39  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
40  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
41  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
42  }
44  CHECK(gpu_allocator);
45  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
46  gpu_allocator->copyToDevice(
47  gpu_literal_buf_ptr, reinterpret_cast<const int8_t*>(&literal), sizeof(T));
48  return gpu_literal_buf_ptr;
49  }
50  }
51  UNREACHABLE();
52  return nullptr;
53 }
54 
55 // Specialization for std::string. Currently we simply hand the UDTF a char* to the
56 // first char of a c-style null-terminated string we copy out of the std::string.
57 // May want to evaluate moving to sending in the ptr and size
58 template <>
59 const int8_t* create_literal_buffer(std::string* const literal,
60  const ExecutorDeviceType device_type,
61  std::vector<std::unique_ptr<char[]>>& literals_owner,
62  CudaAllocator* gpu_allocator) {
63  const int64_t string_size = literal->size();
64  const int64_t padded_string_size =
65  (string_size + 7) / 8 * 8; // round up to the next multiple of 8
66  switch (device_type) {
68  literals_owner.emplace_back(
69  std::make_unique<char[]>(sizeof(int64_t) + padded_string_size));
70  std::memcpy(literals_owner.back().get(), &string_size, sizeof(int64_t));
71  std::memcpy(
72  literals_owner.back().get() + sizeof(int64_t), literal->data(), string_size);
73  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
74  }
76  CHECK(gpu_allocator);
77  const auto gpu_literal_buf_ptr =
78  gpu_allocator->alloc(sizeof(int64_t) + padded_string_size);
79  gpu_allocator->copyToDevice(gpu_literal_buf_ptr,
80  reinterpret_cast<const int8_t*>(&string_size),
81  sizeof(int64_t));
82  gpu_allocator->copyToDevice(gpu_literal_buf_ptr + sizeof(int64_t),
83  reinterpret_cast<const int8_t*>(literal->data()),
84  string_size);
85  return gpu_literal_buf_ptr;
86  }
87  }
88  UNREACHABLE();
89  return nullptr;
90 }
91 
93  size_t input_element_count) {
94  size_t allocated_output_row_count = 0;
95  switch (exe_unit.table_func.getOutputRowSizeType()) {
99  allocated_output_row_count = exe_unit.output_buffer_size_param;
100  break;
101  }
103  allocated_output_row_count =
104  exe_unit.output_buffer_size_param * input_element_count;
105  break;
106  }
108  allocated_output_row_count = input_element_count;
109  break;
110  }
111  default: {
112  UNREACHABLE();
113  }
114  }
115  return allocated_output_row_count;
116 }
117 
118 } // namespace
119 
121  const TableFunctionExecutionUnit& exe_unit,
122  const std::vector<InputTableInfo>& table_infos,
123  const std::shared_ptr<CompilationContext>& compilation_context,
124  const ColumnFetcher& column_fetcher,
125  const ExecutorDeviceType device_type,
126  Executor* executor,
127  bool is_pre_launch_udtf) {
128  auto timer = DEBUG_TIMER(__func__);
129  CHECK(compilation_context);
130  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
131  std::vector<std::unique_ptr<char[]>> literals_owner;
132 
133  const int device_id = 0; // TODO(adb): support multi-gpu table functions
134  std::unique_ptr<CudaAllocator> device_allocator;
135  if (device_type == ExecutorDeviceType::GPU) {
136  auto data_mgr = executor->getDataMgr();
137  device_allocator.reset(new CudaAllocator(
138  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
139  }
140  std::vector<const int8_t*> col_buf_ptrs;
141  std::vector<int64_t> col_sizes;
142  std::vector<const int8_t*> input_str_dict_proxy_ptrs;
143  std::optional<size_t> input_num_rows;
144 
145  int col_index = -1;
146  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
147  // arguments are not supported on GPU atm.
148  std::vector<std::vector<const int8_t*>> col_list_bufs;
149  std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
150  for (const auto& input_expr : exe_unit.input_exprs) {
151  auto ti = input_expr->get_type_info();
152  if (!ti.is_column_list()) {
153  CHECK_EQ(col_index, -1);
154  }
155  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
156  auto table_id = col_var->get_table_id();
157  auto table_info_it = std::find_if(
158  table_infos.begin(), table_infos.end(), [&table_id](const auto& table_info) {
159  return table_info.table_id == table_id;
160  });
161  CHECK(table_info_it != table_infos.end());
162  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
163  executor,
164  *col_var,
165  table_info_it->info.fragments.front(),
168  device_id,
169  device_allocator.get(),
170  /*thread_idx=*/0,
171  chunks_owner,
172  column_fetcher.columnarized_table_cache_);
173  // We use the number of entries in the first column to be the number of rows to base
174  // the output off of (optionally depending on the sizing parameter)
175  if (!input_num_rows) {
176  input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
177  }
178 
179  int8_t* input_str_dict_proxy_ptr = nullptr;
180  if (ti.is_subtype_dict_encoded_string()) {
181  const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
182  ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
183  input_str_dict_proxy_ptr =
184  reinterpret_cast<int8_t*>(input_string_dictionary_proxy);
185  }
186  if (ti.is_column_list()) {
187  if (col_index == -1) {
188  col_list_bufs.push_back({});
189  input_col_list_str_dict_proxy_ptrs.push_back({});
190  col_list_bufs.back().reserve(ti.get_dimension());
191  input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
192  } else {
193  CHECK_EQ(col_sizes.back(), buf_elem_count);
194  }
195  col_index++;
196  col_list_bufs.back().push_back(col_buf);
197  input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
198  // append col_buf to column_list col_buf
199  if (col_index + 1 == ti.get_dimension()) {
200  col_index = -1;
201  }
202  // columns in the same column_list point to column_list data
203  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
204  input_str_dict_proxy_ptrs.push_back(
205  (const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
206  } else {
207  col_buf_ptrs.push_back(col_buf);
208  input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
209  }
210  col_sizes.push_back(buf_elem_count);
211  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
212  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
213  // separate serialization component
214  col_sizes.push_back(0);
215  input_str_dict_proxy_ptrs.push_back(nullptr);
216  const auto const_val_datum = constant_val->get_constval();
217  const auto& ti = constant_val->get_type_info();
218  if (ti.is_fp()) {
219  switch (get_bit_width(ti)) {
220  case 32:
221  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
222  device_type,
223  literals_owner,
224  device_allocator.get()));
225  break;
226  case 64:
227  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
228  device_type,
229  literals_owner,
230  device_allocator.get()));
231  break;
232  default:
233  UNREACHABLE();
234  }
235  } else if (ti.is_integer() || ti.is_timestamp()) {
236  switch (get_bit_width(ti)) {
237  case 8:
238  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
239  device_type,
240  literals_owner,
241  device_allocator.get()));
242  break;
243  case 16:
244  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
245  device_type,
246  literals_owner,
247  device_allocator.get()));
248  break;
249  case 32:
250  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
251  device_type,
252  literals_owner,
253  device_allocator.get()));
254  break;
255  case 64:
256  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
257  device_type,
258  literals_owner,
259  device_allocator.get()));
260  break;
261  default:
262  UNREACHABLE();
263  }
264  } else if (ti.is_boolean()) {
265  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.boolval,
266  device_type,
267  literals_owner,
268  device_allocator.get()));
269  } else if (ti.is_bytes()) { // text encoding none string
270  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.stringval,
271  device_type,
272  literals_owner,
273  device_allocator.get()));
274  } else {
275  throw TableFunctionError("Literal value " + constant_val->toString() +
276  " is not yet supported.");
277  }
278  }
279  }
280  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
281  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
282  if (!exe_unit.table_func
283  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
284  // user-specified constants,
285  // and runtime table funtion
286  // specified sizing, only
287  // user-specified row-multipliers
288  // currently take into account input
289  // row size
290  CHECK(input_num_rows);
291  }
292  std::vector<int8_t*> output_str_dict_proxy_ptrs;
293  for (const auto& output_expr : exe_unit.target_exprs) {
294  int8_t* output_str_dict_proxy_ptr = nullptr;
295  auto ti = output_expr->get_type_info();
296  if (ti.is_dict_encoded_string()) {
297  const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
298  ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
299  output_str_dict_proxy_ptr =
300  reinterpret_cast<int8_t*>(output_string_dictionary_proxy);
301  }
302  output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
303  }
304 
305  if (is_pre_launch_udtf) {
308  exe_unit,
309  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
310  col_buf_ptrs,
311  col_sizes,
312  *input_num_rows,
313  executor);
314  return nullptr;
315  } else {
316  switch (device_type) {
318  return launchCpuCode(
319  exe_unit,
320  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
321  col_buf_ptrs,
322  col_sizes,
323  input_str_dict_proxy_ptrs,
324  *input_num_rows,
325  output_str_dict_proxy_ptrs,
326  executor);
328  return launchGpuCode(
329  exe_unit,
330  std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
331  col_buf_ptrs,
332  col_sizes,
333  input_str_dict_proxy_ptrs,
334  *input_num_rows,
335  output_str_dict_proxy_ptrs,
336  /*device_id=*/0,
337  executor);
338  }
339  }
340  UNREACHABLE();
341  return nullptr;
342 }
343 
345 
347  const TableFunctionExecutionUnit& exe_unit,
348  const std::shared_ptr<CpuCompilationContext>& compilation_context,
349  std::vector<const int8_t*>& col_buf_ptrs,
350  std::vector<int64_t>& col_sizes,
351  const size_t elem_count, // taken from first source only currently
352  Executor* executor) {
353  auto timer = DEBUG_TIMER(__func__);
354  int64_t output_row_count = 0;
355 
356  // If TableFunctionManager must be a singleton but it has been
357  // initialized from another thread, TableFunctionManager constructor
358  // blocks via TableFunctionManager_singleton_mutex until the
359  // existing singleton is deconstructed.
360  auto mgr = std::make_unique<TableFunctionManager>(
361  exe_unit,
362  executor,
363  col_buf_ptrs,
365  /*is_singleton=*/!exe_unit.table_func.usesManager());
366 
367  // setup the inputs
368  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
369  const auto byte_stream_ptr = !col_buf_ptrs.empty()
370  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
371  : nullptr;
372  if (!col_buf_ptrs.empty()) {
373  CHECK(byte_stream_ptr);
374  }
375  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
376  if (!col_sizes.empty()) {
377  CHECK(col_sizes_ptr);
378  }
379 
380  // execute
381  const auto err = compilation_context->table_function_entry_point()(
382  reinterpret_cast<const int8_t*>(mgr.get()),
383  byte_stream_ptr, // input columns buffer
384  col_sizes_ptr, // input column sizes
385  nullptr, // input string dictionary proxy ptrs - not supported for pre-flights yet
386  nullptr,
387  nullptr, // output string dictionary proxy ptrs - not supported for pre-flights yet
388  &output_row_count);
389 
390  if (exe_unit.table_func.hasPreFlightOutputSizer()) {
391  exe_unit.output_buffer_size_param = output_row_count;
392  }
393 
395  throw UserTableFunctionError("Error executing table function pre flight check: " +
396  std::string(mgr->get_error_message()));
397  } else if (err) {
398  throw UserTableFunctionError("Error executing table function pre flight check: " +
399  std::to_string(err));
400  }
401 }
402 
404  const TableFunctionExecutionUnit& exe_unit,
405  const std::shared_ptr<CpuCompilationContext>& compilation_context,
406  std::vector<const int8_t*>& col_buf_ptrs,
407  std::vector<int64_t>& col_sizes,
408  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
409  const size_t elem_count, // taken from first source only currently
410  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
411  Executor* executor) {
412  auto timer = DEBUG_TIMER(__func__);
413  int64_t output_row_count = 0;
414 
415  // If TableFunctionManager must be a singleton but it has been
416  // initialized from another thread, TableFunctionManager constructor
417  // blocks via TableFunctionManager_singleton_mutex until the
418  // existing singleton is deconstructed.
419  auto mgr = std::make_unique<TableFunctionManager>(
420  exe_unit,
421  executor,
422  col_buf_ptrs,
424  /*is_singleton=*/!exe_unit.table_func.usesManager());
425 
426  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
427  // allocate output buffers because the size is known up front, from
428  // user specified parameters (and table size in the case of a user
429  // specified row multiplier)
430  output_row_count = get_output_row_count(exe_unit, elem_count);
431  } else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
432  output_row_count = exe_unit.output_buffer_size_param;
433  }
434 
435  // setup the inputs
436  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
437  const auto byte_stream_ptr = !col_buf_ptrs.empty()
438  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
439  : nullptr;
440  if (!col_buf_ptrs.empty()) {
441  CHECK(byte_stream_ptr);
442  }
443  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
444  if (!col_sizes.empty()) {
445  CHECK(col_sizes_ptr);
446  }
447  const auto input_str_dict_proxy_byte_stream_ptr =
448  !input_str_dict_proxy_ptrs.empty()
449  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
450  : nullptr;
451 
452  const auto output_str_dict_proxy_byte_stream_ptr =
453  !output_str_dict_proxy_ptrs.empty()
454  ? reinterpret_cast<int8_t**>(output_str_dict_proxy_ptrs.data())
455  : nullptr;
456 
457  // execute
458  const auto err = compilation_context->table_function_entry_point()(
459  reinterpret_cast<const int8_t*>(mgr.get()),
460  byte_stream_ptr, // input columns buffer
461  col_sizes_ptr, // input column sizes
462  input_str_dict_proxy_byte_stream_ptr, // input str dictionary proxies
463  nullptr,
464  output_str_dict_proxy_byte_stream_ptr,
465  &output_row_count);
466 
468  throw UserTableFunctionError("Error executing table function: " +
469  std::string(mgr->get_error_message()));
470  }
471 
472  else if (err) {
473  throw UserTableFunctionError("Error executing table function: " +
474  std::to_string(err));
475  }
476 
477  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
478  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
479  throw TableFunctionError(
480  "Table function with constant sizing parameter must return " +
481  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
482  ")");
483  }
484  } else {
485  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
486  output_row_count = mgr->get_nrows();
487  }
488  }
489  // Update entry count, it may differ from allocated mem size
490  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
491  // set_output_row_size has not been called
492  if (output_row_count == 0) {
493  // allocate for empty output columns
494  mgr->allocate_output_buffers(0);
495  } else {
496  throw TableFunctionError("Table function must call set_output_row_size");
497  }
498  }
499 
500  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
501 
502  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
503  CHECK(group_by_buffers_ptr);
504  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
505 
506  auto num_out_columns = exe_unit.target_exprs.size();
507  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
508  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
509  // Todo (todd): Consolidate this column byte offset logic that occurs in at least 4
510  // places
511  for (size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
512  auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
513  if (ti.is_array()) {
514  // TODO: implement FlatBuffer normalization when the
515  // max_nof_values is larger than the nof specified values.
516  //
517  // TODO: implement flatbuffer resize when output_row_count < mgr->get_nrows()
518  CHECK_EQ(mgr->get_nrows(), output_row_count);
519  FlatBufferManager m{src};
520  const size_t allocated_column_size = m.flatbufferSize();
521  const size_t actual_column_size = allocated_column_size;
522  src = align_to_int64(src + allocated_column_size);
523  dst = align_to_int64(dst + actual_column_size);
524  if (ti.is_text_encoding_dict_array()) {
525  CHECK_EQ(m.getDTypeMetadataDictId(),
526  ti.get_comp_param()); // ensure that dict_id is preserved
527  }
528  } else {
529  const size_t target_width = ti.get_size();
530  const size_t allocated_column_size = target_width * mgr->get_nrows();
531  const size_t actual_column_size = target_width * output_row_count;
532  if (src != dst) {
533  auto t = memmove(dst, src, actual_column_size);
534  CHECK_EQ(dst, t);
535  }
536  src = align_to_int64(src + allocated_column_size);
537  dst = align_to_int64(dst + actual_column_size);
538  }
539  }
540  return mgr->query_buffers->getResultSetOwned(0);
541 }
542 
543 namespace {
544 enum {
554 };
555 }
556 
558  const TableFunctionExecutionUnit& exe_unit,
559  const std::shared_ptr<GpuCompilationContext>& compilation_context,
560  std::vector<const int8_t*>& col_buf_ptrs,
561  std::vector<int64_t>& col_sizes,
562  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
563  const size_t elem_count,
564  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
565  const int device_id,
566  Executor* executor) {
567 #ifdef HAVE_CUDA
568  auto timer = DEBUG_TIMER(__func__);
570  throw QueryMustRunOnCpu();
571  }
572 
573  auto num_out_columns = exe_unit.target_exprs.size();
574  auto data_mgr = executor->getDataMgr();
575  auto gpu_allocator = std::make_unique<CudaAllocator>(
576  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
577  CHECK(gpu_allocator);
578  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
579 
580  // TODO: implement table function manager for CUDA
581  // kernels. kernel_params[MANAGER] ought to contain a device pointer
582  // to a struct that a table function kernel with a
583  // TableFunctionManager argument can access from the device.
584  kernel_params[MANAGER] =
585  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int8_t*)));
586 
587  // setup the inputs
588  auto byte_stream_ptr = !(col_buf_ptrs.empty())
589  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
590  : nullptr;
591  if (byte_stream_ptr) {
592  gpu_allocator->copyToDevice(byte_stream_ptr,
593  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
594  col_buf_ptrs.size() * sizeof(int64_t));
595  }
596  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
597 
598  auto col_sizes_ptr = !(col_sizes.empty())
599  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
600  : nullptr;
601  if (col_sizes_ptr) {
602  gpu_allocator->copyToDevice(col_sizes_ptr,
603  reinterpret_cast<int8_t*>(col_sizes.data()),
604  col_sizes.size() * sizeof(int64_t));
605  }
606  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
607 
608  kernel_params[ERROR_BUFFER] =
609  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
610  // initialize output memory
612  elem_count,
614  /*is_table_function=*/true);
615  query_mem_desc.setOutputColumnar(true);
616 
617  for (size_t i = 0; i < num_out_columns; i++) {
618  const size_t col_width = exe_unit.target_exprs[i]->get_type_info().get_size();
619  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
620  }
621  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
622  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
623  exe_unit,
625  device_id,
627  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
628  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
629  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
631  gpu_allocator.get(),
632  executor);
633 
634  // setup the output
635  int64_t output_row_count = allocated_output_row_count;
636 
637  kernel_params[OUTPUT_ROW_COUNT] =
638  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
639  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
640  reinterpret_cast<int8_t*>(&output_row_count),
641  sizeof(output_row_count));
642  /*
643   TODO: RBC generated runtime table functions do not support
644  concurrent execution on a CUDA device. Hence, we'll force 1 as
645  block/grid size in the case of runtime table functions. To support
646  this, in RBC, we'll need to expose threadIdx/blockIdx/blockDim to
647  runtime table functions and these must do something sensible with
648  this information..
649  */
650  const unsigned block_size_x =
651  (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
652  const unsigned block_size_y = 1;
653  const unsigned block_size_z = 1;
654  const unsigned grid_size_x =
655  (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
656  const unsigned grid_size_y = 1;
657  const unsigned grid_size_z = 1;
658 
659  auto gpu_output_buffers =
660  query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
661  device_id,
662  block_size_x,
663  grid_size_x,
664  true /* zero_initialize_buffers */);
665 
666  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
667 
668  // execute
669  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
670 
671  std::vector<void*> param_ptrs;
672  for (auto& param : kernel_params) {
673  param_ptrs.push_back(&param);
674  }
675 
676  // Get cu func
677 
678  CHECK(compilation_context);
679  const auto native_code = compilation_context->getNativeCode(device_id);
680  auto cu_func = static_cast<CUfunction>(native_code.first);
681  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
682  checkCudaErrors(cuLaunchKernel(cu_func,
683  grid_size_x,
684  grid_size_y,
685  grid_size_z,
686  block_size_x,
687  block_size_y,
688  block_size_z,
689  0, // shared mem bytes
690  qe_cuda_stream,
691  &param_ptrs[0],
692  nullptr));
693  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
694 
695  // read output row count from GPU
696  gpu_allocator->copyFromDevice(
697  reinterpret_cast<int8_t*>(&output_row_count),
698  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
699  sizeof(int64_t));
700  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
701  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
702  throw TableFunctionError(
703  "Table function with constant sizing parameter must return " +
704  std::to_string(allocated_output_row_count) + " (got " +
705  std::to_string(output_row_count) + ")");
706  }
707  } else {
708  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
709  output_row_count = allocated_output_row_count;
710  }
711  }
712 
713  // Update entry count, it may differ from allocated mem size
714  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
715 
716  // Copy back to CPU storage
717  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
718  query_mem_desc,
719  output_row_count,
720  gpu_output_buffers,
721  device_id,
722  block_size_x,
723  grid_size_x);
724 
725  return query_buffers->getResultSetOwned(0);
726 #else
727  UNREACHABLE();
728  return nullptr;
729 #endif
730 }
Defines data structures for the semantic analysis phase of query processing.
#define CHECK_EQ(x, y)
Definition: Logger.h:230
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:28
#define UNREACHABLE()
Definition: Logger.h:266
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
std::string to_string(char const *&&v)
size_t get_bit_width(const SQLTypeInfo &ti)
void * CUfunction
Definition: nocuda.h:25
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::mutex TableFunctionManager_singleton_mutex
void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const override
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
int8_t * alloc(const size_t num_bytes) override
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
#define CHECK_LE(x, y)
Definition: Logger.h:233
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
int64_t flatbufferSize() const
Definition: FlatBuffer.h:208
const int8_t * create_literal_buffer(const T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
OutputBufferSizeType getOutputRowSizeType() const
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)