34 std::vector<std::unique_ptr<
char[]>>& literals_owner,
37 switch (device_type) {
39 literals_owner.emplace_back(std::make_unique<
char[]>(
sizeof(int64_t)));
40 std::memcpy(literals_owner.back().get(), &literal,
sizeof(
T));
41 return reinterpret_cast<const int8_t*
>(literals_owner.back().get());
45 const auto gpu_literal_buf_ptr = gpu_allocator->
alloc(
sizeof(int64_t));
47 gpu_literal_buf_ptr, reinterpret_cast<const int8_t*>(&literal),
sizeof(
T));
48 return gpu_literal_buf_ptr;
61 std::vector<std::unique_ptr<
char[]>>& literals_owner,
63 const int64_t string_size = literal->size();
64 const int64_t padded_string_size =
65 (string_size + 7) / 8 * 8;
66 switch (device_type) {
68 literals_owner.emplace_back(
69 std::make_unique<
char[]>(
sizeof(int64_t) + padded_string_size));
70 std::memcpy(literals_owner.back().get(), &string_size,
sizeof(int64_t));
72 literals_owner.back().get() +
sizeof(int64_t), literal->data(), string_size);
73 return reinterpret_cast<const int8_t*
>(literals_owner.back().get());
77 const auto gpu_literal_buf_ptr =
78 gpu_allocator->
alloc(
sizeof(int64_t) + padded_string_size);
80 reinterpret_cast<const int8_t*>(&string_size),
82 gpu_allocator->
copyToDevice(gpu_literal_buf_ptr +
sizeof(int64_t),
83 reinterpret_cast<const int8_t*>(literal->data()),
85 return gpu_literal_buf_ptr;
93 size_t input_element_count) {
94 size_t allocated_output_row_count = 0;
103 allocated_output_row_count =
108 allocated_output_row_count = input_element_count;
115 return allocated_output_row_count;
122 const std::vector<InputTableInfo>& table_infos,
123 const std::shared_ptr<CompilationContext>& compilation_context,
127 bool is_pre_launch_udtf) {
129 CHECK(compilation_context);
130 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
131 std::vector<std::unique_ptr<char[]>> literals_owner;
133 const int device_id = 0;
134 std::unique_ptr<CudaAllocator> device_allocator;
136 auto data_mgr = executor->getDataMgr();
140 std::vector<const int8_t*> col_buf_ptrs;
141 std::vector<int64_t> col_sizes;
142 std::vector<const int8_t*> input_str_dict_proxy_ptrs;
143 std::optional<size_t> input_num_rows;
148 std::vector<std::vector<const int8_t*>> col_list_bufs;
149 std::vector<std::vector<const int8_t*>> input_col_list_str_dict_proxy_ptrs;
150 for (
const auto& input_expr : exe_unit.
input_exprs) {
151 auto ti = input_expr->get_type_info();
152 if (!ti.is_column_list()) {
155 if (
auto col_var = dynamic_cast<Analyzer::ColumnVar*>(input_expr)) {
156 const auto& table_key = col_var->getTableKey();
157 auto table_info_it = std::find_if(
158 table_infos.begin(), table_infos.end(), [&table_key](
const auto& table_info) {
159 return table_info.table_key == table_key;
161 CHECK(table_info_it != table_infos.end());
165 table_info_it->info.fragments.front(),
169 device_allocator.get(),
175 if (!input_num_rows) {
176 input_num_rows = (buf_elem_count > 0 ? buf_elem_count : 1);
179 int8_t* input_str_dict_proxy_ptr =
nullptr;
180 if (ti.is_subtype_dict_encoded_string()) {
181 const auto input_string_dictionary_proxy = executor->getStringDictionaryProxy(
182 ti.getStringDictKey(), executor->getRowSetMemoryOwner(),
true);
183 input_str_dict_proxy_ptr =
184 reinterpret_cast<int8_t*
>(input_string_dictionary_proxy);
186 if (ti.is_column_list()) {
187 if (col_index == -1) {
188 col_list_bufs.push_back({});
189 input_col_list_str_dict_proxy_ptrs.push_back({});
190 col_list_bufs.back().reserve(ti.get_dimension());
191 input_col_list_str_dict_proxy_ptrs.back().reserve(ti.get_dimension());
193 CHECK_EQ(col_sizes.back(), buf_elem_count);
196 col_list_bufs.back().push_back(col_buf);
197 input_col_list_str_dict_proxy_ptrs.back().push_back(input_str_dict_proxy_ptr);
199 if (col_index + 1 == ti.get_dimension()) {
203 col_buf_ptrs.push_back((
const int8_t*)col_list_bufs.back().data());
204 input_str_dict_proxy_ptrs.push_back(
205 (
const int8_t*)input_col_list_str_dict_proxy_ptrs.back().data());
207 col_buf_ptrs.push_back(col_buf);
208 input_str_dict_proxy_ptrs.push_back(input_str_dict_proxy_ptr);
210 col_sizes.push_back(buf_elem_count);
211 }
else if (
const auto& constant_val = dynamic_cast<Analyzer::Constant*>(input_expr)) {
214 col_sizes.push_back(0);
215 input_str_dict_proxy_ptrs.push_back(
nullptr);
216 const auto const_val_datum = constant_val->get_constval();
217 const auto& ti = constant_val->get_type_info();
224 device_allocator.get()));
230 device_allocator.get()));
235 }
else if (ti.is_integer() || ti.is_timestamp() || ti.is_timeinterval()) {
241 device_allocator.get()));
247 device_allocator.get()));
253 device_allocator.get()));
259 device_allocator.get()));
264 }
else if (ti.is_boolean()) {
268 device_allocator.get()));
269 }
else if (ti.is_bytes()) {
273 device_allocator.get()));
276 " is not yet supported.");
280 "Unsupported expression as input to table function: " + input_expr->toString() +
281 "\n Only literal constants and columns are supported!");
294 CHECK(input_num_rows);
296 std::vector<int8_t*> output_str_dict_proxy_ptrs;
298 int8_t* output_str_dict_proxy_ptr =
nullptr;
299 auto ti = output_expr->get_type_info();
300 if (ti.is_dict_encoded_string()) {
301 const auto output_string_dictionary_proxy = executor->getStringDictionaryProxy(
302 ti.getStringDictKey(), executor->getRowSetMemoryOwner(),
true);
303 output_str_dict_proxy_ptr =
304 reinterpret_cast<int8_t*
>(output_string_dictionary_proxy);
306 output_str_dict_proxy_ptrs.emplace_back(output_str_dict_proxy_ptr);
309 if (is_pre_launch_udtf) {
313 std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
320 switch (device_type) {
324 std::dynamic_pointer_cast<CpuCompilationContext>(compilation_context),
327 input_str_dict_proxy_ptrs,
329 output_str_dict_proxy_ptrs,
334 std::dynamic_pointer_cast<GpuCompilationContext>(compilation_context),
337 input_str_dict_proxy_ptrs,
339 output_str_dict_proxy_ptrs,
352 const std::shared_ptr<CpuCompilationContext>& compilation_context,
353 std::vector<const int8_t*>& col_buf_ptrs,
354 std::vector<int64_t>& col_sizes,
355 const size_t elem_count,
356 Executor* executor) {
358 int64_t output_row_count = 0;
364 auto mgr = std::make_unique<TableFunctionManager>(
369 !exe_unit.table_func.usesManager());
373 const auto byte_stream_ptr = !col_buf_ptrs.empty()
374 ?
reinterpret_cast<const int8_t**
>(col_buf_ptrs.data())
376 if (!col_buf_ptrs.empty()) {
377 CHECK(byte_stream_ptr);
379 const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() :
nullptr;
380 if (!col_sizes.empty()) {
381 CHECK(col_sizes_ptr);
385 const auto err = compilation_context->table_function_entry_point()(
386 reinterpret_cast<const int8_t*
>(mgr.get()),
394 if (exe_unit.table_func.hasPreFlightOutputSizer()) {
395 exe_unit.output_buffer_size_param = output_row_count;
400 std::string(mgr->get_error_message()));
492 const std::shared_ptr<CpuCompilationContext>& compilation_context,
493 std::vector<const int8_t*>& col_buf_ptrs,
494 std::vector<int64_t>& col_sizes,
495 std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
496 const size_t elem_count,
497 std::vector<int8_t*>& output_str_dict_proxy_ptrs,
498 Executor* executor) {
500 int64_t output_row_count = 0;
506 auto mgr = std::make_unique<TableFunctionManager>(
511 !exe_unit.table_func.usesManager());
513 if (exe_unit.table_func.hasOutputSizeKnownPreLaunch()) {
518 }
else if (exe_unit.table_func.hasPreFlightOutputSizer()) {
519 output_row_count = exe_unit.output_buffer_size_param;
524 const auto byte_stream_ptr = !col_buf_ptrs.empty()
525 ?
reinterpret_cast<const int8_t**
>(col_buf_ptrs.data())
527 if (!col_buf_ptrs.empty()) {
528 CHECK(byte_stream_ptr);
530 const auto col_sizes_ptr = !col_sizes.empty() ? col_sizes.data() :
nullptr;
531 if (!col_sizes.empty()) {
532 CHECK(col_sizes_ptr);
534 const auto input_str_dict_proxy_byte_stream_ptr =
535 !input_str_dict_proxy_ptrs.empty()
536 ?
reinterpret_cast<const int8_t**
>(input_str_dict_proxy_ptrs.data())
539 const auto output_str_dict_proxy_byte_stream_ptr =
540 !output_str_dict_proxy_ptrs.empty()
541 ?
reinterpret_cast<int8_t**
>(output_str_dict_proxy_ptrs.data())
545 const auto err = compilation_context->table_function_entry_point()(
546 reinterpret_cast<const int8_t*
>(mgr.get()),
549 input_str_dict_proxy_byte_stream_ptr,
551 output_str_dict_proxy_byte_stream_ptr,
556 std::string(mgr->get_error_message()));
564 if (exe_unit.table_func.hasCompileTimeOutputSizeConstant()) {
565 if (static_cast<size_t>(output_row_count) != mgr->get_nrows()) {
567 "Table function with constant sizing parameter must return " +
572 if (output_row_count < 0 || (
size_t)output_row_count > mgr->get_nrows()) {
573 output_row_count = mgr->get_nrows();
577 if (exe_unit.table_func.hasTableFunctionSpecifiedParameter() && !mgr->query_buffers) {
579 if (output_row_count == 0) {
581 mgr->allocate_output_buffers(0);
587 mgr->query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
589 auto group_by_buffers_ptr = mgr->query_buffers->getGroupByBuffersPtr();
590 CHECK(group_by_buffers_ptr);
591 auto output_buffers_ptr =
reinterpret_cast<int64_t*
>(group_by_buffers_ptr[0]);
593 auto num_out_columns = exe_unit.target_exprs.size();
594 int8_t* src =
reinterpret_cast<int8_t*
>(output_buffers_ptr);
595 int8_t* dst =
reinterpret_cast<int8_t*
>(output_buffers_ptr);
598 for (
size_t col_idx = 0; col_idx < num_out_columns; col_idx++) {
599 auto ti = exe_unit.target_exprs[col_idx]->get_type_info();
605 CHECK_EQ(mgr->get_nrows(), output_row_count);
608 const size_t actual_column_size = allocated_column_size;
611 if (ti.is_text_encoding_dict_array()) {
612 CHECK_EQ(m.getDTypeMetadataDictDbId(),
613 ti.getStringDictKey().db_id);
614 CHECK_EQ(m.getDTypeMetadataDictId(),
615 ti.getStringDictKey().dict_id);
618 const size_t target_width = ti.get_size();
619 const size_t allocated_column_size = target_width * mgr->get_nrows();
620 const size_t actual_column_size = target_width * output_row_count;
622 auto t = memmove(dst, src, actual_column_size);
629 return mgr->query_buffers->getResultSetOwned(0);
648 const std::shared_ptr<GpuCompilationContext>& compilation_context,
649 std::vector<const int8_t*>& col_buf_ptrs,
650 std::vector<int64_t>& col_sizes,
651 std::vector<const int8_t*>& input_str_dict_proxy_ptrs,
652 const size_t elem_count,
653 std::vector<int8_t*>& output_str_dict_proxy_ptrs,
655 Executor* executor) {
663 auto data_mgr = executor->getDataMgr();
664 auto gpu_allocator = std::make_unique<CudaAllocator>(
666 CHECK(gpu_allocator);
674 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int8_t*)));
677 auto byte_stream_ptr = !(col_buf_ptrs.empty())
678 ? gpu_allocator->alloc(col_buf_ptrs.size() *
sizeof(int64_t))
680 if (byte_stream_ptr) {
681 gpu_allocator->copyToDevice(byte_stream_ptr,
682 reinterpret_cast<int8_t*>(col_buf_ptrs.data()),
683 col_buf_ptrs.size() *
sizeof(int64_t));
687 auto col_sizes_ptr = !(col_sizes.empty())
688 ? gpu_allocator->alloc(col_sizes.size() *
sizeof(int64_t))
691 gpu_allocator->copyToDevice(col_sizes_ptr,
692 reinterpret_cast<int8_t*>(col_sizes.data()),
693 col_sizes.size() *
sizeof(int64_t));
698 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int32_t)));
706 for (
size_t i = 0; i < num_out_columns; i++) {
707 const size_t col_width = exe_unit.
target_exprs[i]->get_type_info().get_size();
708 query_mem_desc.
addColSlotInfo({std::make_tuple(col_width, col_width)});
711 auto query_buffers = std::make_unique<QueryMemoryInitializer>(
716 (allocated_output_row_count == 0 ? 1 : allocated_output_row_count),
717 std::vector<std::vector<const int8_t*>>{col_buf_ptrs},
718 std::vector<std::vector<uint64_t>>{{0}},
724 int64_t output_row_count = allocated_output_row_count;
727 reinterpret_cast<CUdeviceptr>(gpu_allocator->alloc(
sizeof(int64_t*)));
728 gpu_allocator->copyToDevice(reinterpret_cast<int8_t*>(kernel_params[
OUTPUT_ROW_COUNT]),
729 reinterpret_cast<int8_t*>(&output_row_count),
730 sizeof(output_row_count));
739 const unsigned block_size_x =
740 (exe_unit.table_func.isRuntime() ? 1 : executor->blockSize());
741 const unsigned block_size_y = 1;
742 const unsigned block_size_z = 1;
743 const unsigned grid_size_x =
744 (exe_unit.table_func.isRuntime() ? 1 : executor->gridSize());
745 const unsigned grid_size_y = 1;
746 const unsigned grid_size_z = 1;
748 auto gpu_output_buffers =
749 query_buffers->setupTableFunctionGpuBuffers(query_mem_desc,
760 std::vector<void*> param_ptrs;
761 for (
auto& param : kernel_params) {
762 param_ptrs.push_back(¶m);
767 CHECK(compilation_context);
768 const auto native_code = compilation_context->getNativeCode(device_id);
769 auto cu_func =
static_cast<CUfunction>(native_code.first);
771 VLOG(1) <<
"Launch GPU table function kernel compiled with the following block and "
773 << block_size_x <<
" and " << grid_size_x;
788 gpu_allocator->copyFromDevice(
789 reinterpret_cast<int8_t*>(&output_row_count),
790 reinterpret_cast<int8_t*>(kernel_params[OUTPUT_ROW_COUNT]),
792 if (exe_unit.table_func.hasNonUserSpecifiedOutputSize()) {
793 if (static_cast<size_t>(output_row_count) != allocated_output_row_count) {
795 "Table function with constant sizing parameter must return " +
800 if (output_row_count < 0 || (
size_t)output_row_count > allocated_output_row_count) {
801 output_row_count = allocated_output_row_count;
806 query_buffers->getResultSet(0)->updateStorageEntryCount(output_row_count);
809 query_buffers->copyFromTableFunctionGpuBuffers(data_mgr,
817 return query_buffers->getResultSetOwned(0);
Defines data structures for the semantic analysis phase of query processing.
size_t get_output_row_count(const TableFunctionExecutionUnit &exe_unit, size_t input_element_count)
void launchPreCodeOnCpu(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, const size_t elem_count, Executor *executor)
std::vector< Analyzer::Expr * > input_exprs
const table_functions::TableFunction table_func
void checkCudaErrors(CUresult err)
unsigned long long CUdeviceptr
void setOutputColumnar(const bool val)
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
size_t output_buffer_size_param
bool containsPreFlightFn() const
size_t get_bit_width(const SQLTypeInfo &ti)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::mutex TableFunctionManager_singleton_mutex
void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const override
ResultSetPtr launchCpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< CpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, Executor *executor)
int8_t * alloc(const size_t num_bytes) override
ResultSetPtr execute(const TableFunctionExecutionUnit &exe_unit, const std::vector< InputTableInfo > &table_infos, const std::shared_ptr< CompilationContext > &compilation_context, const ColumnFetcher &column_fetcher, const ExecutorDeviceType device_type, Executor *executor, bool is_pre_launch_udtf)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
int64_t flatbufferSize() const
bool hasTableFunctionSpecifiedParameter() const
const int8_t * create_literal_buffer(const T literal, const ExecutorDeviceType device_type, std::vector< std::unique_ptr< char[]>> &literals_owner, CudaAllocator *gpu_allocator)
#define DEBUG_TIMER(name)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
bool hasOutputSizeIndependentOfInputSize() const
OutputBufferSizeType getOutputRowSizeType() const
ResultSetPtr launchGpuCode(const TableFunctionExecutionUnit &exe_unit, const std::shared_ptr< GpuCompilationContext > &compilation_context, std::vector< const int8_t * > &col_buf_ptrs, std::vector< int64_t > &col_sizes, std::vector< const int8_t * > &input_str_dict_proxy_ptrs, const size_t elem_count, std::vector< int8_t * > &output_str_dict_proxy_ptrs, const int device_id, Executor *executor)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)