OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionExecutionContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Analyzer/Analyzer.h"
20 #include "Logger/Logger.h"
27 #include "Shared/funcannotations.h"
28 
29 namespace {
30 
31 template <typename T>
32 const int8_t* create_literal_buffer(const T literal,
33  const ExecutorDeviceType device_type,
34  std::vector<std::unique_ptr<char[]>>& literals_owner,
35  CudaAllocator* gpu_allocator) {
36  CHECK_LE(sizeof(T), sizeof(int64_t)); // pad to 8 bytes
37  switch (device_type) {
39  literals_owner.emplace_back(std::make_unique<char[]>(sizeof(int64_t)));
40  std::memcpy(literals_owner.back().get(), &literal, sizeof(T));
41  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
42  }
44  CHECK(gpu_allocator);
45  const auto gpu_literal_buf_ptr = gpu_allocator->alloc(sizeof(int64_t));
46  gpu_allocator->copyToDevice(
47  gpu_literal_buf_ptr, reinterpret_cast<const int8_t*>(&literal), sizeof(T));
48  return gpu_literal_buf_ptr;
49  }
50  }
51  UNREACHABLE();
52  return nullptr;
53 }
54 
55 // Specialization for std::string. Currently we simply hand the UDTF a char* to the
56 // first char of a c-style null-terminated string we copy out of the std::string.
57 // May want to evaluate moving to sending in the ptr and size
58 template <>
59 const int8_t* create_literal_buffer(std::string* const literal,
60  const ExecutorDeviceType device_type,
61  std::vector<std::unique_ptr<char[]>>& literals_owner,
62  CudaAllocator* gpu_allocator) {
63  const int64_t string_size = literal->size();
64  const int64_t padded_string_size =
65  (string_size + 7) / 8 * 8; // round up to the next multiple of 8
66  switch (device_type) {
68  literals_owner.emplace_back(
69  std::make_unique<char[]>(sizeof(int64_t) + padded_string_size));
70  std::memcpy(literals_owner.back().get(), &string_size, sizeof(int64_t));
71  std::memcpy(
72  literals_owner.back().get() + sizeof(int64_t), literal->data(), string_size);
73  return reinterpret_cast<const int8_t*>(literals_owner.back().get());
74  }
76  CHECK(gpu_allocator);
77  const auto gpu_literal_buf_ptr =
78  gpu_allocator->alloc(sizeof(int64_t) + padded_string_size);
79  gpu_allocator->copyToDevice(gpu_literal_buf_ptr,
80  reinterpret_cast<const int8_t*>(&string_size),
81  sizeof(int64_t));
82  gpu_allocator->copyToDevice(gpu_literal_buf_ptr + sizeof(int64_t),
83  reinterpret_cast<const int8_t*>(literal->data()),
84  string_size);
85  return gpu_literal_buf_ptr;
86  }
87  }
88  UNREACHABLE();
89  return nullptr;
90 }
91 
93  size_t input_element_count) {
94  size_t allocated_output_row_count = 0;
95  switch (exe_unit.table_func.getOutputRowSizeType()) {
99  allocated_output_row_count = exe_unit.output_buffer_size_param;
100  break;
101  }
103  allocated_output_row_count =
104  exe_unit.output_buffer_size_param * input_element_count;
105  break;
106  }
108  allocated_output_row_count = input_element_count;
109  break;
110  }
111  default: {
112  UNREACHABLE();
113  }
114  }
115  return allocated_output_row_count;
116 }
117 
118 } // namespace
119 
121  const TableFunctionExecutionUnit& exe_unit,
122  const std::vector<InputTableInfo>& table_infos,
123  const std::shared_ptr<CompilationContext>& compilation_context,
124  const ColumnFetcher& column_fetcher,
125  const ExecutorDeviceType device_type,
126  Executor* executor,
127  bool is_pre_launch_udtf) {
128  auto timer = DEBUG_TIMER(__func__);
129  CHECK(compilation_context);
130  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
131  std::vector<std::unique_ptr<char[]>> literals_owner;
132 
133  const int device_id = 0; // TODO(adb): support multi-gpu table functions
134  std::unique_ptr<CudaAllocator> device_allocator;
135  if (device_type == ExecutorDeviceType::GPU) {
136  auto data_mgr = executor->getDataMgr();
137  device_allocator.reset(new CudaAllocator(
138  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
139  }
140  std::vector<const int8_t*> col_buf_ptrs;
141  std::vector<int64_t> col_sizes;
142  std::vector<const int8_t*> input_str_dict_proxy_ptrs;
143  std::optional<size_t> input_num_rows;
144 
145  int col_index = -1;
146  // TODO: col_list_bufs are allocated on CPU memory, so UDTFs with column_list
147  // arguments are not supported on GPU atm.
148  std::vector<std::vector<const int8_t*>> col_list_bufs;
149  std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
150  for (const auto& input_expr : exe_unit.input_exprs) {
151  auto ti = input_expr->get_type_info();
152  if (!ti.is_column_list()) {
153  CHECK_EQ(col_index, -1);
154  }
155  if (auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
156  const auto& table_key = col_var->getTableKey();
157  auto table_info_it = std::find_if(
158  table_infos.begin(), table_infos.end(), [&table_key](const auto& table_info) {
159  return table_info.table_key == table_key;
160  });
161  CHECK(table_info_it != table_infos.end());
162  auto [col_buf, buf_elem_count] = ColumnFetcher::getOneColumnFragment(
163  executor,
164  *col_var,
165  table_info_it->info.fragments.front(),
168  device_id,
169  device_allocator.get(),
170  /*thread_idx=*/0,
171  chunks_owner,
172  column_fetcher.columnarized_table_cache_);
173  // We use the number of entries in the first column to be the number of rows to base
174  // the output off of (optionally depending on the sizing parameter)
175  if (!input_num_rows) {
176  input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
177  }
178 
179  int8_t* input_str_dict_proxy_ptr = nullptr;
180  if (ti.is_subtype_dict_encoded_string()) {
181  const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
182  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
183  input_str_dict_proxy_ptr =
184  reinterpret_cast<int8_t*>(input_string_dictionary_proxy);
185  }
186  if (ti.is_column_list()) {
187  if (col_index == -1) {
188  col_list_bufs.push_back({});
189  input_col_list_str_dict_proxy_ptrs.push_back({});
190  col_list_bufs.back().reserve(ti.get_dimension());
191  input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
192  } else {
193  CHECK_EQ(col_sizes.back(), buf_elem_count);
194  }
195  col_index++;
196  col_list_bufs.back().push_back(col_buf);
197  input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
198  // append col_buf to column_list col_buf
199  if (col_index + 1 == ti.get_dimension()) {
200  col_index = -1;
201  }
202  // columns in the same column_list point to column_list data
203  col_buf_ptrs.push_back((const int8_t*)col_list_bufs.back().data());
204  input_str_dict_proxy_ptrs.push_back(
205  (const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
206  } else {
207  col_buf_ptrs.push_back(col_buf);
208  input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
209  }
210  col_sizes.push_back(buf_elem_count);
211  } else if (const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
212  // TODO(adb): Unify literal handling with rest of system, either in Codegen or as a
213  // separate serialization component
214  col_sizes.push_back(0);
215  input_str_dict_proxy_ptrs.push_back(nullptr);
216  const auto const_val_datum = constant_val->get_constval();
217  const auto& ti = constant_val->get_type_info();
218  if (ti.is_fp()) {
219  switch (get_bit_width(ti)) {
220  case 32:
221  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.floatval,
222  device_type,
223  literals_owner,
224  device_allocator.get()));
225  break;
226  case 64:
227  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.doubleval,
228  device_type,
229  literals_owner,
230  device_allocator.get()));
231  break;
232  default:
233  UNREACHABLE();
234  }
235  } else if (ti.is_integer() || ti.is_timestamp() || ti.is_timeinterval()) {
236  switch (get_bit_width(ti)) {
237  case 8:
238  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.tinyintval,
239  device_type,
240  literals_owner,
241  device_allocator.get()));
242  break;
243  case 16:
244  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.smallintval,
245  device_type,
246  literals_owner,
247  device_allocator.get()));
248  break;
249  case 32:
250  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.intval,
251  device_type,
252  literals_owner,
253  device_allocator.get()));
254  break;
255  case 64:
256  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.bigintval,
257  device_type,
258  literals_owner,
259  device_allocator.get()));
260  break;
261  default:
262  UNREACHABLE();
263  }
264  } else if (ti.is_boolean()) {
265  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.boolval,
266  device_type,
267  literals_owner,
268  device_allocator.get()));
269  } else if (ti.is_bytes()) { // text encoding none string
270  col_buf_ptrs.push_back(create_literal_buffer(const_val_datum.stringval,
271  device_type,
272  literals_owner,
273  device_allocator.get()));
274  } else {
275  throw TableFunctionError("Literal value " + constant_val->toString() +
276  " is not yet supported.");
277  }
278  } else {
279  throw TableFunctionError(
280  "Unsupported expression as input to table function: " + input_expr->toString() +
281  "\n Only literal constants and columns are supported!");
282  }
283  }
284  CHECK_EQ(col_buf_ptrs.size(), exe_unit.input_exprs.size());
285  CHECK_EQ(col_sizes.size(), exe_unit.input_exprs.size());
286  if (!exe_unit.table_func
287  .hasOutputSizeIndependentOfInputSize()) { // includes compile-time constants,
288  // user-specified constants,
289  // and runtime table funtion
290  // specified sizing, only
291  // user-specified row-multipliers
292  // currently take into account input
293  // row size
294  CHECK(input_num_rows);
295  }
296  std::vector<int8_t*> output_str_dict_proxy_ptrs;
297  for (const auto& output_expr : exe_unit.target_exprs) {
298  int8_t* output_str_dict_proxy_ptr = nullptr;
299  auto ti = output_expr->get_type_info();
300  if (ti.is_dict_encoded_string()) {
301  const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
302  ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
303  output_str_dict_proxy_ptr =
304  reinterpret_cast<int8_t*>(output_string_dictionary_proxy);
305  }
306  output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
307  }
308 
309  if (is_pre_launch_udtf) {
312  exe_unit,
313  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
314  col_buf_ptrs,
315  col_sizes,
316  *input_num_rows,
317  executor);
318  return nullptr;
319  } else {
320  switch (device_type) {
322  return launchCpuCode(
323  exe_unit,
324  std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
325  col_buf_ptrs,
326  col_sizes,
327  input_str_dict_proxy_ptrs,
328  *input_num_rows,
329  output_str_dict_proxy_ptrs,
330  executor);
332  return launchGpuCode(
333  exe_unit,
334  std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
335  col_buf_ptrs,
336  col_sizes,
337  input_str_dict_proxy_ptrs,
338  *input_num_rows,
339  output_str_dict_proxy_ptrs,
340  /*device_id=*/0,
341  executor);
342  }
343  }
344  UNREACHABLE();
345  return nullptr;
346 }
347 
349 
351  const TableFunctionExecutionUnit& exe_unit,
352  const std::shared_ptr<CpuCompilationContext>& compilation_context,
353  std::vector<const int8_t*>& col_buf_ptrs,
354  std::vector<int64_t>& col_sizes,
355  const size_t elem_count, // taken from first source only currently
356  Executor* executor) {
357  auto timer = DEBUG_TIMER(__func__);
358  int64_t output_row_count = 0;
359 
360  // If TableFunctionManager must be a singleton but it has been
361  // initialized from another thread, TableFunctionManager constructor
362  // blocks via TableFunctionManager_singleton_mutex until the
363  // existing singleton is deconstructed.
364  auto mgr = std::make_unique<TableFunctionManager>(
365  exe_unit,
366  executor,
367  col_buf_ptrs,
369  /*is_singleton=*/!exe_unit.table_func.usesManager());
370 
371  // setup the inputs
372  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
373  const auto byte_stream_ptr = !col_buf_ptrs.empty()
374  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
375  : nullptr;
376  if (!col_buf_ptrs.empty()) {
377  CHECK(byte_stream_ptr);
378  }
379  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
380  if (!col_sizes.empty()) {
381  CHECK(col_sizes_ptr);
382  }
383 
384  // execute
385  const auto err = compilation_context->table_function_entry_point()(
386  reinterpret_cast<const int8_t*>(mgr.get()),
387  byte_stream_ptr, // input columns buffer
388  col_sizes_ptr, // input column sizes
389  nullptr, // input string dictionary proxy ptrs - not supported for pre-flights yet
390  nullptr,
391  nullptr, // output string dictionary proxy ptrs - not supported for pre-flights yet
392  &output_row_count);
393 
394  if (exe_unit.table_func.hasPreFlightOutputSizer()) {
395  exe_unit.output_buffer_size_param = output_row_count;
396  }
397 
399  throw UserTableFunctionError("Error executing table function pre flight check: " +
400  std::string(mgr->get_error_message()));
401  } else if (err) {
402  throw UserTableFunctionError("Error executing table function pre flight check: " +
403  std::to_string(err));
404  }
405 }
406 
407 // clang-format off
408 /*
409  Managing the output buffers from table functions
410  ------------------------------------------------
411 
412  In general, the results of a query (a list of columns) is hold by a
413  ResultSet instance. While ResultSet is a rather complicated
414  structure, its most important members are
415 
416  std::vector<TargetInfo> targets_ that holds the type of output
417  columns (recall: `struct TargetInfo {..., SQLTypeInfo sql_type,
418  ...};`)
419 
420  std::unique_ptr<ResultSetStorage> storage_ that stores the
421  underlying buffer for a result set (recall: `struct
422  ResultSetStorage {..., int8_t* buff_, ...};`)
423 
424  QueryMemoryDescriptor query_mem_desc_ that describes the format of
425  the storage for a result set.
426 
427  QueryMemoryDescriptor structure contains the following relevant
428  members:
429 
430  QueryDescriptionType query_desc_type_ is equal to one of
431  GroupByPerfectHash, GroupByBaselineHash, Projection,
432  TableFunction, NonGroupedAggregate, Estimator. In the following,
433  we assume query_desc_type_ == TableFunction.
434 
435  bool output_columnar_ is always true for table function result
436  sets.
437 
438  size_t entry_count_ is the number of entries in the storage
439  buffer. This typically corresponds to the number of output rows.
440 
441  ColSlotContext col_slot_context_ describes the internal structure
442  of the storage buffer using the following members:
443 
444  std::vector<SlotSize> slot_sizes_ where we have `struct SlotSize
445  { int8_t padded_size; int8_t logical_size; };`
446 
447  std::vector<std::vector<size_t>> col_to_slot_map_ describes the
448  mapping of a column to possibly multiple slots.
449 
450  std::unordered_map<SlotIndex, ArraySize> varlen_output_slot_map_
451 
452  In the case of table function result sets, the QueryMemoryDescriptor
453  instance is created in TableFunctionManager::allocate_output_buffers
454  method and we have query_desc_type_ == TableFunction.
455 
456  Depending on the target info of an output column, the internal
457  structure of the storage buffer has two variants:
458 
459  - traditional where the buffer size of a particular column is
460  described by entry_count_ and
461  col_slot_context_.slot_sizes_. This variant is used for output
462  columns of fixed-width scalar types such as integers, floats,
463  boolean, text encoded dicts, etc. For the corresponding column
464  with col_idx, we have
465 
466  col_to_slot_map_[col_idx] == {slot_idx}
467  slot_sizes_[slot_idx] == {column_width, column_width}
468 
469  where column_width is targets_[col_idx].sql_type.get_size().
470 
471  - flatbuffer where the buffer size of a particular column is
472  described by varlen_output_slot_map_. This variant is used for
473  output columns of variable length composite types such as arrays
474  of ints, floats, etc. For the corresponding column with col_idx,
475  we have
476 
477  col_to_slot_map_[col_idx] == {slot_idx}
478  slot_sizes_[slot_idx] == {0, 0}
479  varlen_output_slot_map_ contains an item col_idx:flatbuffer_size
480 
481  Only table functions produce result sets that may contain both
482  variants. The variants can be distinguished via
483  `getPaddedSlotWidthBytes(slot_idx) == 0` test.
484 
485  In the case of table function result sets, col_idx == slot_idx holds.
486 
487 */
488 // clang-format on
489 
491  const TableFunctionExecutionUnit& exe_unit,
492  const std::shared_ptr<CpuCompilationContext>& compilation_context,
493  std::vector<const int8_t*>& col_buf_ptrs,
494  std::vector<int64_t>& col_sizes,
495  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
496  const size_t elem_count, // taken from first source only currently
497  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
498  Executor* executor) {
499  auto timer = DEBUG_TIMER(__func__);
500  int64_t output_row_count = 0;
501 
502  // If TableFunctionManager must be a singleton but it has been
503  // initialized from another thread, TableFunctionManager constructor
504  // blocks via TableFunctionManager_singleton_mutex until the
505  // existing singleton is deconstructed.
506  auto mgr = std::make_unique<TableFunctionManager>(
507  exe_unit,
508  executor,
509  col_buf_ptrs,
511  /*is_singleton=*/!exe_unit.table_func.usesManager());
512 
513  if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
514  // allocate output buffers because the size is known up front, from
515  // user specified parameters (and table size in the case of a user
516  // specified row multiplier)
517  output_row_count = get_output_row_count(exe_unit, elem_count);
518  } else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
519  output_row_count = exe_unit.output_buffer_size_param;
520  }
521 
522  // setup the inputs
523  // We can have an empty col_buf_ptrs vector if there are no arguments to the function
524  const auto byte_stream_ptr = !col_buf_ptrs.empty()
525  ? reinterpret_cast<const int8_t**>(col_buf_ptrs.data())
526  : nullptr;
527  if (!col_buf_ptrs.empty()) {
528  CHECK(byte_stream_ptr);
529  }
530  const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() : nullptr;
531  if (!col_sizes.empty()) {
532  CHECK(col_sizes_ptr);
533  }
534  const auto input_str_dict_proxy_byte_stream_ptr =
535  !input_str_dict_proxy_ptrs.empty()
536  ? reinterpret_cast<const int8_t**>(input_str_dict_proxy_ptrs.data())
537  : nullptr;
538 
539  const auto output_str_dict_proxy_byte_stream_ptr =
540  !output_str_dict_proxy_ptrs.empty()
541  ? reinterpret_cast<int8_t**>(output_str_dict_proxy_ptrs.data())
542  : nullptr;
543 
544  // execute
545  const auto err = compilation_context->table_function_entry_point()(
546  reinterpret_cast<const int8_t*>(mgr.get()),
547  byte_stream_ptr, // input columns buffer
548  col_sizes_ptr, // input column sizes
549  input_str_dict_proxy_byte_stream_ptr, // input str dictionary proxies
550  nullptr,
551  output_str_dict_proxy_byte_stream_ptr,
552  &output_row_count);
553 
555  throw UserTableFunctionError("Error executing table function: " +
556  std::string(mgr->get_error_message()));
557  }
558 
559  else if (err) {
560  throw UserTableFunctionError("Error executing table function: " +
561  std::to_string(err));
562  }
563 
564  if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
565  if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
566  throw TableFunctionError(
567  "Table function with constant sizing parameter must return " +
568  std::to_string(mgr->get_nrows()) + " (got " + std::to_string(output_row_count) +
569  ")");
570  }
571  } else {
572  if (output_row_count < 0 || (size_t)output_row_count > mgr->get_nrows()) {
573  output_row_count = mgr->get_nrows();
574  }
575  }
576  // Update entry count, it may differ from allocated mem size
577  if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
578  // set_output_row_size has not been called
579  if (output_row_count == 0) {
580  // allocate for empty output columns
581  mgr->allocate_output_buffers(0);
582  } else {
583  throw TableFunctionError("Table function must call set_output_row_size");
584  }
585  }
586 
587  mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
588 
589  auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
590  CHECK(group_by_buffers_ptr);
591  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
592 
593  auto num_out_columns = exe_unit.target_exprs.size();
594  int8_t* src = reinterpret_cast<int8_t*>(output_buffers_ptr);
595  int8_t* dst = reinterpret_cast<int8_t*>(output_buffers_ptr);
596  // Todo (todd): Consolidate this column byte offset logic that occurs in at least 4
597  // places
598  for (size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
599  auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
600  if (ti.is_array()) {
601  // TODO: implement FlatBuffer normalization when the
602  // max_nof_values is larger than the nof specified values.
603  //
604  // TODO: implement flatbuffer resize when output_row_count < mgr->get_nrows()
605  CHECK_EQ(mgr->get_nrows(), output_row_count);
606  FlatBufferManager m{src};
607  const size_t allocated_column_size = m.flatbufferSize();
608  const size_t actual_column_size = allocated_column_size;
609  src = align_to_int64(src + allocated_column_size);
610  dst = align_to_int64(dst + actual_column_size);
611  if (ti.is_text_encoding_dict_array()) {
612  CHECK_EQ(m.getDTypeMetadataDictDbId(),
613  ti.getStringDictKey().db_id); // ensure that db_id is preserved
614  CHECK_EQ(m.getDTypeMetadataDictId(),
615  ti.getStringDictKey().dict_id); // ensure that dict_id is preserved
616  }
617  } else {
618  const size_t target_width = ti.get_size();
619  const size_t allocated_column_size = target_width * mgr->get_nrows();
620  const size_t actual_column_size = target_width * output_row_count;
621  if (src != dst) {
622  auto t = memmove(dst, src, actual_column_size);
623  CHECK_EQ(dst, t);
624  }
625  src = align_to_int64(src + allocated_column_size);
626  dst = align_to_int64(dst + actual_column_size);
627  }
628  }
629  return mgr->query_buffers->getResultSetOwned(0);
630 }
631 
632 namespace {
633 enum {
643 };
644 }
645 
647  const TableFunctionExecutionUnit& exe_unit,
648  const std::shared_ptr<GpuCompilationContext>& compilation_context,
649  std::vector<const int8_t*>& col_buf_ptrs,
650  std::vector<int64_t>& col_sizes,
651  std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
652  const size_t elem_count,
653  std::vector<int8_t*>& output_str_dict_proxy_ptrs,
654  const int device_id,
655  Executor* executor) {
656 #ifdef HAVE_CUDA
657  auto timer = DEBUG_TIMER(__func__);
659  throw QueryMustRunOnCpu();
660  }
661 
662  auto num_out_columns = exe_unit.target_exprs.size();
663  auto data_mgr = executor->getDataMgr();
664  auto gpu_allocator = std::make_unique<CudaAllocator>(
665  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
666  CHECK(gpu_allocator);
667  std::vector<CUdeviceptr> kernel_params(KERNEL_PARAM_COUNT, 0);
668 
669  // TODO: implement table function manager for CUDA
670  // kernels. kernel_params[MANAGER] ought to contain a device pointer
671  // to a struct that a table function kernel with a
672  // TableFunctionManager argument can access from the device.
673  kernel_params[MANAGER] =
674  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int8_t*)));
675 
676  // setup the inputs
677  auto byte_stream_ptr = !(col_buf_ptrs.empty())
678  ? gpu_allocator->alloc(col_buf_ptrs.size() * sizeof(int64_t))
679  : nullptr;
680  if (byte_stream_ptr) {
681  gpu_allocator->copyToDevice(byte_stream_ptr,
682  reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
683  col_buf_ptrs.size() * sizeof(int64_t));
684  }
685  kernel_params[COL_BUFFERS] = reinterpret_cast<CUdeviceptr>(byte_stream_ptr);
686 
687  auto col_sizes_ptr = !(col_sizes.empty())
688  ? gpu_allocator->alloc(col_sizes.size() * sizeof(int64_t))
689  : nullptr;
690  if (col_sizes_ptr) {
691  gpu_allocator->copyToDevice(col_sizes_ptr,
692  reinterpret_cast<int8_t*>(col_sizes.data()),
693  col_sizes.size() * sizeof(int64_t));
694  }
695  kernel_params[COL_SIZES] = reinterpret_cast<CUdeviceptr>(col_sizes_ptr);
696 
697  kernel_params[ERROR_BUFFER] =
698  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int32_t)));
699  // initialize output memory
701  elem_count,
703  /*is_table_function=*/true);
704  query_mem_desc.setOutputColumnar(true);
705 
706  for (size_t i = 0; i < num_out_columns; i++) {
707  const size_t col_width = exe_unit.target_exprs[i]->get_type_info().get_size();
708  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
709  }
710  const auto allocated_output_row_count = get_output_row_count(exe_unit, elem_count);
711  auto query_buffers = std::make_unique<QueryMemoryInitializer>(
712  exe_unit,
714  device_id,
716  (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
717  std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
718  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
720  gpu_allocator.get(),
721  executor);
722 
723  // setup the output
724  int64_t output_row_count = allocated_output_row_count;
725 
726  kernel_params[OUTPUT_ROW_COUNT] =
727  reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(sizeof(int64_t*)));
728  gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
729  reinterpret_cast<int8_t*>(&output_row_count),
730  sizeof(output_row_count));
731  /*
732   TODO: RBC generated runtime table functions do not support
733  concurrent execution on a CUDA device. Hence, we'll force 1 as
734  block/grid size in the case of runtime table functions. To support
735  this, in RBC, we'll need to expose threadIdx/blockIdx/blockDim to
736  runtime table functions and these must do something sensible with
737  this information..
738  */
739  const unsigned block_size_x =
740  (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
741  const unsigned block_size_y = 1;
742  const unsigned block_size_z = 1;
743  const unsigned grid_size_x =
744  (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
745  const unsigned grid_size_y = 1;
746  const unsigned grid_size_z = 1;
747 
748  auto gpu_output_buffers =
749  query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
750  device_id,
751  block_size_x,
752  grid_size_x,
753  true /* zero_initialize_buffers */);
754 
755  kernel_params[OUTPUT_BUFFERS] = reinterpret_cast<CUdeviceptr>(gpu_output_buffers.ptrs);
756 
757  // execute
758  CHECK_EQ(static_cast<size_t>(KERNEL_PARAM_COUNT), kernel_params.size());
759 
760  std::vector<void*> param_ptrs;
761  for (auto& param : kernel_params) {
762  param_ptrs.push_back(&param);
763  }
764 
765  // Get cu func
766 
767  CHECK(compilation_context);
768  const auto native_code = compilation_context->getNativeCode(device_id);
769  auto cu_func = static_cast<CUfunction>(native_code.first);
770  auto qe_cuda_stream = getQueryEngineCudaStreamForDevice(device_id);
771  VLOG(1) << "Launch GPU table function kernel compiled with the following block and "
772  "grid sizes: "
773  << block_size_x << " and " << grid_size_x;
774  checkCudaErrors(cuLaunchKernel(cu_func,
775  grid_size_x,
776  grid_size_y,
777  grid_size_z,
778  block_size_x,
779  block_size_y,
780  block_size_z,
781  0, // shared mem bytes
782  qe_cuda_stream,
783  &param_ptrs[0],
784  nullptr));
785  checkCudaErrors(cuStreamSynchronize(qe_cuda_stream));
786 
787  // read output row count from GPU
788  gpu_allocator->copyFromDevice(
789  reinterpret_cast<int8_t*>(&output_row_count),
790  reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
791  sizeof(int64_t));
792  if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
793  if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
794  throw TableFunctionError(
795  "Table function with constant sizing parameter must return " +
796  std::to_string(allocated_output_row_count) + " (got " +
797  std::to_string(output_row_count) + ")");
798  }
799  } else {
800  if (output_row_count < 0 || (size_t)output_row_count > allocated_output_row_count) {
801  output_row_count = allocated_output_row_count;
802  }
803  }
804 
805  // Update entry count, it may differ from allocated mem size
806  query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
807 
808  // Copy back to CPU storage
809  query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
810  query_mem_desc,
811  output_row_count,
812  gpu_output_buffers,
813  device_id,
814  block_size_x,
815  grid_size_x);
816 
817  return query_buffers->getResultSetOwned(0);
818 #else
819  UNREACHABLE();
820  return nullptr;
821 #endif
822 }
Defines data structures for the semantic analysis phase of query processing.
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
std::vector< Analyzer::Expr * > input_exprs
ExecutorDeviceType
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
Definition: sample.cpp:38
unsigned long long CUdeviceptr
Definition: nocuda.h:28
#define UNREACHABLE()
Definition: Logger.h:337
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
std::string to_string(char const *&&v)
size_t get_bit_width(const SQLTypeInfo &ti)
void * CUfunction
Definition: nocuda.h:25
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::mutex TableFunctionManager_singleton_mutex
void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const override
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
int8_t * alloc(const size_t num_bytes) override
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
#define CHECK_LE(x, y)
Definition: Logger.h:304
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
int64_t flatbufferSize() const
Definition: FlatBuffer.h:219
const int8_t * create_literal_buffer(const T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
OutputBufferSizeType getOutputRowSizeType() const
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
#define VLOG(n)
Definition: Logger.h:387