OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutionKernel Class Reference

#include <ExecutionKernel.h>

+ Collaboration diagram for ExecutionKernel:

Public Member Functions

 ExecutionKernel (const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, RenderInfo *render_info, const int64_t rowid_lookup_key)
 
void run (Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
 

Public Attributes

const RelAlgExecutionUnitra_exe_unit_
 

Private Member Functions

void runImpl (Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
 

Private Attributes

const ExecutorDeviceType chosen_device_type
 
int chosen_device_id
 
const ExecutionOptionseo
 
const ColumnFetchercolumn_fetcher
 
const QueryCompilationDescriptorquery_comp_desc
 
const QueryMemoryDescriptorquery_mem_desc
 
const FragmentsList frag_list
 
const ExecutorDispatchMode kernel_dispatch_mode
 
RenderInforender_info_
 
const int64_t rowid_lookup_key
 
ResultSetPtr device_results_
 

Friends

class KernelSubtask
 

Detailed Description

Definition at line 72 of file ExecutionKernel.h.

Constructor & Destructor Documentation

ExecutionKernel::ExecutionKernel ( const RelAlgExecutionUnit ra_exe_unit,
const ExecutorDeviceType  chosen_device_type,
int  chosen_device_id,
const ExecutionOptions eo,
const ColumnFetcher column_fetcher,
const QueryCompilationDescriptor query_comp_desc,
const QueryMemoryDescriptor query_mem_desc,
const FragmentsList frag_list,
const ExecutorDispatchMode  kernel_dispatch_mode,
RenderInfo render_info,
const int64_t  rowid_lookup_key 
)
inline

Definition at line 74 of file ExecutionKernel.h.

85  : ra_exe_unit_(ra_exe_unit)
88  , eo(eo)
89  , column_fetcher(column_fetcher)
90  , query_comp_desc(query_comp_desc)
91  , query_mem_desc(query_mem_desc)
94  , render_info_(render_info)
const ExecutionOptions & eo
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
const int64_t rowid_lookup_key
const ExecutorDeviceType chosen_device_type
RenderInfo * render_info_
const QueryMemoryDescriptor & query_mem_desc
const QueryCompilationDescriptor & query_comp_desc
const FragmentsList frag_list
const ColumnFetcher & column_fetcher

Member Function Documentation

void ExecutionKernel::run ( Executor executor,
const size_t  thread_idx,
SharedKernelContext shared_context 
)

Definition at line 123 of file ExecutionKernel.cpp.

References DEBUG_TIMER, Executor::ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED, Executor::ERR_OUT_OF_CPU_MEM, Executor::ERR_OUT_OF_GPU_MEM, Executor::ERR_OUT_OF_RENDER_MEM, Executor::ERR_STRING_CONST_IN_RESULTSET, Executor::ERR_TOO_MANY_LITERALS, QueryMemoryDescriptor::getQueryDescriptionType(), INJECT_TIMER, kernel_dispatch_mode, MultifragmentKernel, query_mem_desc, RelAlgExecutionUnit::query_state, ra_exe_unit_, runImpl(), and OutOfHostMemory::what().

Referenced by Executor::executeUpdate(), and Executor::executeWorkUnitPerFragment().

125  {
126  DEBUG_TIMER("ExecutionKernel::run");
127  INJECT_TIMER(kernel_run);
128  std::optional<logger::QidScopeGuard> qid_scope_guard;
130  qid_scope_guard.emplace(ra_exe_unit_.query_state->setThreadLocalQueryId());
131  }
132  try {
133  runImpl(executor, thread_idx, shared_context);
134  } catch (const OutOfHostMemory& e) {
136  } catch (const std::bad_alloc& e) {
138  } catch (const OutOfRenderMemory& e) {
140  } catch (const OutOfMemory& e) {
141  throw QueryExecutionError(
143  e.what(),
147  } catch (const ColumnarConversionNotSupported& e) {
149  } catch (const TooManyLiterals& e) {
151  } catch (const StringConstInResultSet& e) {
153  } catch (const QueryExecutionError& e) {
154  throw e;
155  }
156 }
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1380
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1381
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1379
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1374
const QueryMemoryDescriptor & query_mem_desc
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1371
QueryDescriptionType getQueryDescriptionType() const
void runImpl(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const char * what() const noexceptfinal
Definition: checked_alloc.h:39
std::shared_ptr< const query_state::QueryState > query_state
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1375

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutionKernel::runImpl ( Executor executor,
const size_t  thread_idx,
SharedKernelContext shared_context 
)
private

Definition at line 158 of file ExecutionKernel.cpp.

References gpu_enabled::accumulate(), SharedKernelContext::addDeviceResults(), ExecutionOptions::allow_runtime_query_interrupt, CHECK, CHECK_EQ, CHECK_GE, CHECK_GT, CHECK_LT, chosen_device_id, chosen_device_type, column_fetcher, QueryFragmentDescriptor::computeAllTablesFragments(), CPU, Data_Namespace::CPU_LEVEL, device_results_, dynamic_watchdog_init(), SharedKernelContext::dynamic_watchdog_set, ExecutionOptions::dynamic_watchdog_time_limit, eo, Executor::ERR_OUT_OF_CPU_MEM, Executor::ERR_OUT_OF_GPU_MEM, RelAlgExecutionUnit::estimator, ExecutionOptions::executor_type, Extern, frag_list, g_cpu_sub_task_size, QueryCompilationDescriptor::getCompilationResult(), SharedKernelContext::getFragOffsets(), QueryMemoryDescriptor::getQueryDescriptionType(), getQueryEngineCudaStreamForDevice(), QueryMemoryDescriptor::getQueryExecutionContext(), SharedKernelContext::getQueryInfos(), GPU, Data_Namespace::GPU_LEVEL, RelAlgExecutionUnit::groupby_exprs, QueryCompilationDescriptor::hoistLiterals(), logger::INFO, RelAlgExecutionUnit::input_descs, heavyai::InSituFlagsOwnerInterface::isInSitu(), kernel_dispatch_mode, KernelPerFragment, LOG, Executor::max_gpu_count, MultifragmentKernel, Native, anonymous_namespace{ExecutionKernel.cpp}::need_to_hold_chunk(), CompilationResult::output_columnar, Projection, query_comp_desc, anonymous_namespace{ExecutionKernel.cpp}::query_has_inner_join(), query_mem_desc, ra_exe_unit_, render_info_, rowid_lookup_key, run_query_external(), RelAlgExecutionUnit::scan_limit, serialize_to_sql(), QueryMemoryDescriptor::sortOnGpu(), RelAlgExecutionUnit::target_exprs, target_exprs_to_infos(), to_string(), RelAlgExecutionUnit::union_all, VLOG, and ExecutionOptions::with_dynamic_watchdog.

Referenced by run().

160  {
161  CHECK(executor);
162  const auto memory_level = chosen_device_type == ExecutorDeviceType::GPU
165  CHECK_GE(frag_list.size(), size_t(1));
166  // frag_list[0].table_id is how we tell which query we are running for UNION ALL.
167  const int outer_table_id = ra_exe_unit_.union_all
168  ? frag_list[0].table_id
169  : ra_exe_unit_.input_descs[0].getTableId();
170  CHECK_EQ(frag_list[0].table_id, outer_table_id);
171  const auto& outer_tab_frag_ids = frag_list[0].fragment_ids;
172 
175 
176  auto catalog = executor->getCatalog();
177  CHECK(catalog);
178 
179  auto data_mgr = executor->getDataMgr();
180 
181  // need to own them while query executes
182  auto chunk_iterators_ptr = std::make_shared<std::list<ChunkIter>>();
183  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks;
184  std::unique_ptr<std::lock_guard<std::mutex>> gpu_lock;
185  std::unique_ptr<CudaAllocator> device_allocator;
187  gpu_lock.reset(
188  new std::lock_guard<std::mutex>(executor->gpu_exec_mutex_[chosen_device_id]));
189  device_allocator = std::make_unique<CudaAllocator>(
190  data_mgr, chosen_device_id, getQueryEngineCudaStreamForDevice(chosen_device_id));
191  }
192  std::shared_ptr<FetchResult> fetch_result(new FetchResult);
193  try {
194  std::map<int, const TableFragments*> all_tables_fragments;
196  all_tables_fragments, ra_exe_unit_, shared_context.getQueryInfos());
197 
198  *fetch_result = ra_exe_unit_.union_all
199  ? executor->fetchUnionChunks(column_fetcher,
200  ra_exe_unit_,
202  memory_level,
203  all_tables_fragments,
204  frag_list,
205  *catalog,
206  *chunk_iterators_ptr,
207  chunks,
208  device_allocator.get(),
209  thread_idx,
211  : executor->fetchChunks(column_fetcher,
212  ra_exe_unit_,
214  memory_level,
215  all_tables_fragments,
216  frag_list,
217  *catalog,
218  *chunk_iterators_ptr,
219  chunks,
220  device_allocator.get(),
221  thread_idx,
223  if (fetch_result->num_rows.empty()) {
224  return;
225  }
227  !shared_context.dynamic_watchdog_set.test_and_set(std::memory_order_acquire)) {
230  LOG(INFO) << "Dynamic Watchdog budget: CPU: "
232  << std::to_string(cycle_budget) << " cycles";
233  }
234  } catch (const OutOfMemory&) {
235  throw QueryExecutionError(
241  return;
242  }
243 
245  if (ra_exe_unit_.input_descs.size() > 1) {
246  throw std::runtime_error("Joins not supported through external execution");
247  }
248  const auto query = serialize_to_sql(&ra_exe_unit_, catalog);
249  GroupByAndAggregate group_by_and_aggregate(executor,
251  ra_exe_unit_,
252  shared_context.getQueryInfos(),
253  executor->row_set_mem_owner_,
254  std::nullopt);
255  const auto query_mem_desc =
256  group_by_and_aggregate.initQueryMemoryDescriptor(false, 0, 8, nullptr, false);
258  query,
259  *fetch_result,
260  executor->plan_state_.get(),
264  executor});
265  shared_context.addDeviceResults(std::move(device_results_), outer_tab_frag_ids);
266  return;
267  }
268  const CompilationResult& compilation_result = query_comp_desc.getCompilationResult();
269  std::unique_ptr<QueryExecutionContext> query_exe_context_owned;
270  const bool do_render = render_info_ && render_info_->isInSitu();
271 
272  int64_t total_num_input_rows{-1};
274  query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection) {
275  total_num_input_rows = 0;
276  std::for_each(fetch_result->num_rows.begin(),
277  fetch_result->num_rows.end(),
278  [&total_num_input_rows](const std::vector<int64_t>& frag_row_count) {
279  total_num_input_rows = std::accumulate(frag_row_count.begin(),
280  frag_row_count.end(),
281  total_num_input_rows);
282  });
283  VLOG(2) << "total_num_input_rows=" << total_num_input_rows;
284  // TODO(adb): we may want to take this early out for all queries, but we are most
285  // likely to see this query pattern on the kernel per fragment path (e.g. with HAVING
286  // 0=1)
287  if (total_num_input_rows == 0) {
288  return;
289  }
290 
292  total_num_input_rows *= ra_exe_unit_.input_descs.size();
293  }
294  }
295 
296  uint32_t start_rowid{0};
297  if (rowid_lookup_key >= 0) {
298  if (!frag_list.empty()) {
299  const auto& all_frag_row_offsets = shared_context.getFragOffsets();
300  start_rowid = rowid_lookup_key -
301  all_frag_row_offsets[frag_list.begin()->fragment_ids.front()];
302  }
303  }
304 
305 #ifdef HAVE_TBB
306  bool can_run_subkernels = shared_context.getThreadPool() != nullptr;
307 
308  // Sub-tasks are supported for groupby queries and estimators only for now.
309  bool is_groupby =
310  (ra_exe_unit_.groupby_exprs.size() > 1) ||
311  (ra_exe_unit_.groupby_exprs.size() == 1 && ra_exe_unit_.groupby_exprs.front());
312  can_run_subkernels = can_run_subkernels && (is_groupby || ra_exe_unit_.estimator);
313 
314  // In case some column is lazily fetched, we cannot mix different fragments in a single
315  // ResultSet.
316  can_run_subkernels =
317  can_run_subkernels && !executor->hasLazyFetchColumns(ra_exe_unit_.target_exprs);
318 
319  // TODO: Use another structure to hold chunks. Currently, ResultSet holds them, but with
320  // sub-tasks chunk can be referenced by many ResultSets. So, some outer structure to
321  // hold all ResultSets and all chunks is required.
322  can_run_subkernels =
323  can_run_subkernels &&
325  chunks, ra_exe_unit_, std::vector<ColumnLazyFetchInfo>(), chosen_device_type);
326 
327  // TODO: check for literals? We serialize literals before execution and hold them in
328  // result sets. Can we simply do it once and holdin an outer structure?
329  if (can_run_subkernels) {
330  size_t total_rows = fetch_result->num_rows[0][0];
331  size_t sub_size = g_cpu_sub_task_size;
332 
333  for (size_t sub_start = start_rowid; sub_start < total_rows; sub_start += sub_size) {
334  sub_size = (sub_start + sub_size > total_rows) ? total_rows - sub_start : sub_size;
335  auto subtask = std::make_shared<KernelSubtask>(*this,
336  shared_context,
337  fetch_result,
338  chunk_iterators_ptr,
339  total_num_input_rows,
340  sub_start,
341  sub_size,
342  thread_idx);
343  shared_context.getThreadPool()->run(
344  [subtask, executor] { subtask->run(executor); });
345  }
346 
347  return;
348  }
349 #endif // HAVE_TBB
350 
352  try {
353  // std::unique_ptr<QueryExecutionContext> query_exe_context_owned
354  // has std::unique_ptr<QueryMemoryInitializer> query_buffers_
355  // has std::vector<std::unique_ptr<ResultSet>> result_sets_
356  // has std::unique_ptr<ResultSetStorage> storage_
357  // which are initialized and possibly allocated here.
358  query_exe_context_owned =
359  query_mem_desc.getQueryExecutionContext(ra_exe_unit_,
360  executor,
364  outer_table_id,
365  total_num_input_rows,
366  fetch_result->col_buffers,
367  fetch_result->frag_offsets,
368  executor->getRowSetMemoryOwner(),
369  compilation_result.output_columnar,
370  query_mem_desc.sortOnGpu(),
371  thread_idx,
372  do_render ? render_info_ : nullptr);
373  } catch (const OutOfHostMemory& e) {
374  throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM);
375  }
376  }
377  QueryExecutionContext* query_exe_context{query_exe_context_owned.get()};
378  CHECK(query_exe_context);
379  int32_t err{0};
380 
381  if (ra_exe_unit_.groupby_exprs.empty()) {
382  err = executor->executePlanWithoutGroupBy(ra_exe_unit_,
383  compilation_result,
388  fetch_result->col_buffers,
389  query_exe_context,
390  fetch_result->num_rows,
391  fetch_result->frag_offsets,
392  data_mgr,
394  start_rowid,
395  ra_exe_unit_.input_descs.size(),
397  do_render ? render_info_ : nullptr);
398  } else {
399  if (ra_exe_unit_.union_all) {
400  VLOG(1) << "outer_table_id=" << outer_table_id
401  << " ra_exe_unit_.scan_limit=" << ra_exe_unit_.scan_limit;
402  }
403  err = executor->executePlanWithGroupBy(ra_exe_unit_,
404  compilation_result,
408  fetch_result->col_buffers,
409  outer_tab_frag_ids,
410  query_exe_context,
411  fetch_result->num_rows,
412  fetch_result->frag_offsets,
413  data_mgr,
415  outer_table_id,
417  start_rowid,
418  ra_exe_unit_.input_descs.size(),
420  do_render ? render_info_ : nullptr);
421  }
422  if (device_results_) {
423  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_to_hold;
424  for (const auto& chunk : chunks) {
425  if (need_to_hold_chunk(chunk.get(),
426  ra_exe_unit_,
427  device_results_->getLazyFetchInfo(),
429  chunks_to_hold.push_back(chunk);
430  }
431  }
432  device_results_->holdChunks(chunks_to_hold);
433  device_results_->holdChunkIterators(chunk_iterators_ptr);
434  } else {
435  VLOG(1) << "null device_results.";
436  }
437  if (err) {
438  throw QueryExecutionError(err);
439  }
440  shared_context.addDeviceResults(std::move(device_results_), outer_tab_frag_ids);
441 }
bool need_to_hold_chunk(const Chunk_NS::Chunk *chunk, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ExecutorDeviceType device_type)
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
size_t g_cpu_sub_task_size
Definition: Execute.cpp:83
const std::vector< uint64_t > & getFragOffsets()
static const int max_gpu_count
Definition: Execute.h:1291
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:216
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
const int64_t rowid_lookup_key
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
const ExecutorDeviceType chosen_device_type
#define CHECK_GE(x, y)
Definition: Logger.h:235
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
RenderInfo * render_info_
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
ExecutorType executor_type
const QueryMemoryDescriptor & query_mem_desc
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
const QueryCompilationDescriptor & query_comp_desc
const std::shared_ptr< Analyzer::Estimator > estimator
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1371
QueryDescriptionType getQueryDescriptionType() const
RUNTIME_EXPORT uint64_t dynamic_watchdog_init(unsigned ms_budget)
#define CHECK_LT(x, y)
Definition: Logger.h:232
const FragmentsList frag_list
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool query_has_inner_join(const RelAlgExecutionUnit &ra_exe_unit)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
#define CHECK(condition)
Definition: Logger.h:222
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
unsigned dynamic_watchdog_time_limit
ExecutionUnitSql serialize_to_sql(const RelAlgExecutionUnit *ra_exe_unit, const Catalog_Namespace::Catalog *catalog)
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1375
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:316
const ColumnFetcher & column_fetcher

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class KernelSubtask
friend

Definition at line 121 of file ExecutionKernel.h.

Member Data Documentation

int ExecutionKernel::chosen_device_id
private

Definition at line 105 of file ExecutionKernel.h.

Referenced by runImpl().

const ExecutorDeviceType ExecutionKernel::chosen_device_type
private

Definition at line 104 of file ExecutionKernel.h.

Referenced by runImpl().

const ColumnFetcher& ExecutionKernel::column_fetcher
private

Definition at line 107 of file ExecutionKernel.h.

Referenced by runImpl().

ResultSetPtr ExecutionKernel::device_results_
private

Definition at line 115 of file ExecutionKernel.h.

Referenced by runImpl().

const ExecutionOptions& ExecutionKernel::eo
private

Definition at line 106 of file ExecutionKernel.h.

Referenced by runImpl().

const FragmentsList ExecutionKernel::frag_list
private

Definition at line 110 of file ExecutionKernel.h.

Referenced by runImpl().

const ExecutorDispatchMode ExecutionKernel::kernel_dispatch_mode
private

Definition at line 111 of file ExecutionKernel.h.

Referenced by run(), and runImpl().

const QueryCompilationDescriptor& ExecutionKernel::query_comp_desc
private

Definition at line 108 of file ExecutionKernel.h.

Referenced by runImpl().

const QueryMemoryDescriptor& ExecutionKernel::query_mem_desc
private

Definition at line 109 of file ExecutionKernel.h.

Referenced by run(), and runImpl().

const RelAlgExecutionUnit& ExecutionKernel::ra_exe_unit_

Definition at line 101 of file ExecutionKernel.h.

Referenced by run(), and runImpl().

RenderInfo* ExecutionKernel::render_info_
private

Definition at line 112 of file ExecutionKernel.h.

Referenced by runImpl().

const int64_t ExecutionKernel::rowid_lookup_key
private

Definition at line 113 of file ExecutionKernel.h.

Referenced by runImpl().


The documentation for this class was generated from the following files: