33 return !res || res->definitelyHasNoRows();
37 return (std::count_if(ra_exe_unit.
join_quals.begin(),
39 [](
const auto& join_condition) {
47 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
52 (chunk_ti.is_array() ||
53 (chunk_ti.is_string() && chunk_ti.get_compression() ==
kENCODING_NONE))) {
54 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
62 if (lazy_fetch_info.empty()) {
66 for (
size_t i = 0; i < ra_exe_unit.
target_exprs.size(); i++) {
68 const auto& col_lazy_fetch = lazy_fetch_info[i];
72 if (col_lazy_fetch.is_lazily_fetched) {
83 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
85 for (
const auto& chunk : chunks) {
100 for (
size_t i = 1; i <=
query_infos_.front().info.fragments.size(); ++i) {
103 query_infos_.front().info.fragments[i - 1].getNumTuples();
110 std::vector<size_t> outer_table_fragment_ids) {
114 outer_table_fragment_ids);
118 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>&
124 const size_t thread_idx,
128 std::optional<logger::QidScopeGuard> qid_scope_guard;
133 runImpl(executor, thread_idx, shared_context);
136 }
catch (
const std::bad_alloc& e) {
159 const size_t thread_idx,
171 const auto& outer_tab_frag_ids =
frag_list[0].fragment_ids;
176 auto catalog = executor->getCatalog();
179 auto data_mgr = executor->getDataMgr();
182 auto chunk_iterators_ptr = std::make_shared<std::list<ChunkIter>>();
183 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks;
184 std::unique_ptr<std::lock_guard<std::mutex>> gpu_lock;
185 std::unique_ptr<CudaAllocator> device_allocator;
188 new std::lock_guard<std::mutex>(executor->gpu_exec_mutex_[
chosen_device_id]));
189 device_allocator = std::make_unique<CudaAllocator>(
192 std::shared_ptr<FetchResult> fetch_result(
new FetchResult);
194 std::map<int, const TableFragments*> all_tables_fragments;
203 all_tables_fragments,
206 *chunk_iterators_ptr,
208 device_allocator.get(),
215 all_tables_fragments,
218 *chunk_iterators_ptr,
220 device_allocator.get(),
223 if (fetch_result->num_rows.empty()) {
230 LOG(
INFO) <<
"Dynamic Watchdog budget: CPU: "
246 throw std::runtime_error(
"Joins not supported through external execution");
253 executor->row_set_mem_owner_,
256 group_by_and_aggregate.initQueryMemoryDescriptor(
false, 0, 8,
nullptr,
false);
260 executor->plan_state_.get(),
269 std::unique_ptr<QueryExecutionContext> query_exe_context_owned;
272 int64_t total_num_input_rows{-1};
275 total_num_input_rows = 0;
276 std::for_each(fetch_result->num_rows.begin(),
277 fetch_result->num_rows.end(),
278 [&total_num_input_rows](
const std::vector<int64_t>& frag_row_count) {
280 frag_row_count.end(),
281 total_num_input_rows);
283 VLOG(2) <<
"total_num_input_rows=" << total_num_input_rows;
287 if (total_num_input_rows == 0) {
296 uint32_t start_rowid{0};
299 const auto& all_frag_row_offsets = shared_context.
getFragOffsets();
301 all_frag_row_offsets[
frag_list.begin()->fragment_ids.front()];
306 bool can_run_subkernels = shared_context.getThreadPool() !=
nullptr;
323 can_run_subkernels &&
329 if (can_run_subkernels) {
330 size_t total_rows = fetch_result->num_rows[0][0];
333 for (
size_t sub_start = start_rowid; sub_start < total_rows; sub_start += sub_size) {
334 sub_size = (sub_start + sub_size > total_rows) ? total_rows - sub_start : sub_size;
335 auto subtask = std::make_shared<KernelSubtask>(*
this,
339 total_num_input_rows,
343 shared_context.getThreadPool()->run(
344 [subtask, executor] { subtask->run(executor); });
358 query_exe_context_owned =
365 total_num_input_rows,
366 fetch_result->col_buffers,
367 fetch_result->frag_offsets,
368 executor->getRowSetMemoryOwner(),
378 CHECK(query_exe_context);
388 fetch_result->col_buffers,
390 fetch_result->num_rows,
391 fetch_result->frag_offsets,
400 VLOG(1) <<
"outer_table_id=" << outer_table_id
408 fetch_result->col_buffers,
411 fetch_result->num_rows,
412 fetch_result->frag_offsets,
423 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_to_hold;
424 for (
const auto& chunk : chunks) {
429 chunks_to_hold.push_back(chunk);
435 VLOG(1) <<
"null device_results.";
450 }
catch (
const std::bad_alloc& e) {
459 kernel_.query_mem_desc.getQueryDescriptionType(),
472 void KernelSubtask::runImpl(Executor* executor) {
473 auto& query_exe_context_owned = shared_context_.getTlsExecutionContext().local();
474 const bool do_render =
475 kernel_.render_info_ && kernel_.render_info_->isPotentialInSituRender();
477 kernel_.query_comp_desc.getCompilationResult();
478 const int outer_table_id = kernel_.ra_exe_unit_.union_all
479 ? kernel_.frag_list[0].table_id
480 : kernel_.ra_exe_unit_.input_descs[0].getTableId();
482 if (!query_exe_context_owned) {
486 std::vector<std::vector<const int8_t*>> col_buffers(
487 fetch_result_->col_buffers.size(),
488 std::vector<const int8_t*>(fetch_result_->col_buffers[0].size()));
489 std::vector<std::vector<uint64_t>> frag_offsets(
490 fetch_result_->frag_offsets.size(),
491 std::vector<uint64_t>(fetch_result_->frag_offsets[0].size()));
492 query_exe_context_owned = kernel_.query_mem_desc.getQueryExecutionContext(
493 kernel_.ra_exe_unit_,
495 kernel_.chosen_device_type,
496 kernel_.kernel_dispatch_mode,
497 kernel_.chosen_device_id,
499 total_num_input_rows_,
502 executor->getRowSetMemoryOwner(),
504 kernel_.query_mem_desc.sortOnGpu(),
507 do_render ? kernel_.render_info_ :
nullptr);
513 const auto& outer_tab_frag_ids = kernel_.frag_list[0].fragment_ids;
514 auto catalog = executor->getCatalog();
517 CHECK(query_exe_context);
520 if (kernel_.ra_exe_unit_.groupby_exprs.empty()) {
521 err = executor->executePlanWithoutGroupBy(kernel_.ra_exe_unit_,
523 kernel_.query_comp_desc.hoistLiterals(),
525 kernel_.ra_exe_unit_.target_exprs,
526 kernel_.chosen_device_type,
527 fetch_result_->col_buffers,
529 fetch_result_->num_rows,
530 fetch_result_->frag_offsets,
531 &catalog->getDataMgr(),
532 kernel_.chosen_device_id,
534 kernel_.ra_exe_unit_.input_descs.size(),
535 kernel_.eo.allow_runtime_query_interrupt,
536 do_render ? kernel_.render_info_ :
nullptr,
537 start_rowid_ + num_rows_to_process_);
539 err = executor->executePlanWithGroupBy(kernel_.ra_exe_unit_,
541 kernel_.query_comp_desc.hoistLiterals(),
543 kernel_.chosen_device_type,
544 fetch_result_->col_buffers,
547 fetch_result_->num_rows,
548 fetch_result_->frag_offsets,
549 &catalog->getDataMgr(),
550 kernel_.chosen_device_id,
552 kernel_.ra_exe_unit_.scan_limit,
554 kernel_.ra_exe_unit_.input_descs.size(),
555 kernel_.eo.allow_runtime_query_interrupt,
556 do_render ? kernel_.render_info_ :
nullptr,
557 start_rowid_ + num_rows_to_process_);
bool need_to_hold_chunk(const Chunk_NS::Chunk *chunk, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ExecutorDeviceType device_type)
std::vector< Analyzer::Expr * > target_exprs
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
size_t g_cpu_sub_task_size
const std::vector< uint64_t > & getFragOffsets()
static const int max_gpu_count
bool with_dynamic_watchdog
const std::optional< bool > union_all
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
std::vector< uint64_t > all_frag_row_offsets_
const int64_t rowid_lookup_key
std::mutex all_frag_row_offsets_mutex_
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
const ExecutorDeviceType chosen_device_type
bool hoistLiterals() const
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
RenderInfo * render_info_
static const int32_t ERR_STRING_CONST_IN_RESULTSET
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
const ColumnDescriptor * getColumnDesc() const
bool needs_skip_result(const ResultSetPtr &res)
ExecutorType executor_type
std::unique_ptr< QueryExecutionContext > getQueryExecutionContext(const RelAlgExecutionUnit &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int outer_table_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner >, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *) const
#define INJECT_TIMER(DESC)
static const int32_t ERR_OUT_OF_RENDER_MEM
const JoinQualsPerNestingLevel join_quals
const QueryMemoryDescriptor & query_mem_desc
DEVICE auto accumulate(ARGS &&...args)
const QueryCompilationDescriptor & query_comp_desc
const std::shared_ptr< Analyzer::Estimator > estimator
static const int32_t ERR_OUT_OF_GPU_MEM
QueryDescriptionType getQueryDescriptionType() const
RUNTIME_EXPORT uint64_t dynamic_watchdog_init(unsigned ms_budget)
void runImpl(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > all_fragment_results_
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
const FragmentsList frag_list
CUstream getQueryEngineCudaStreamForDevice(int device_num)
bool query_has_inner_join(const RelAlgExecutionUnit &ra_exe_unit)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
#define DEBUG_TIMER(name)
const char * what() const noexceptfinal
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::shared_ptr< const query_state::QueryState > query_state
unsigned dynamic_watchdog_time_limit
const std::vector< InputTableInfo > & query_infos_
ExecutionUnitSql serialize_to_sql(const RelAlgExecutionUnit *ra_exe_unit, const Catalog_Namespace::Catalog *catalog)
bool allow_runtime_query_interrupt
auto getCompilationResult() const
static const int32_t ERR_OUT_OF_CPU_MEM
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
const ColumnFetcher & column_fetcher