OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutionKernel.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <mutex>
20 #include <vector>
21 
25 #include "QueryEngine/Execute.h"
28 
29 namespace {
30 
32  return !res || res->definitelyHasNoRows();
33 }
34 
35 inline bool query_has_inner_join(const RelAlgExecutionUnit& ra_exe_unit) {
36  return (std::count_if(ra_exe_unit.join_quals.begin(),
37  ra_exe_unit.join_quals.end(),
38  [](const auto& join_condition) {
39  return join_condition.type == JoinType::INNER;
40  }) > 0);
41 }
42 
43 // column is part of the target expressions, result set iteration needs it alive.
45  const RelAlgExecutionUnit& ra_exe_unit) {
46  CHECK(chunk->getColumnDesc());
47  const auto chunk_ti = chunk->getColumnDesc()->columnType;
48  if (chunk_ti.is_array() ||
49  (chunk_ti.is_string() && chunk_ti.get_compression() == kENCODING_NONE)) {
50  for (const auto target_expr : ra_exe_unit.target_exprs) {
51  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
52  if (col_var && col_var->get_column_id() == chunk->getColumnDesc()->columnId &&
53  col_var->get_table_id() == chunk->getColumnDesc()->tableId) {
54  return true;
55  }
56  }
57  }
58  return false;
59 }
60 
61 } // namespace
62 
63 const std::vector<uint64_t>& SharedKernelContext::getFragOffsets() {
64  std::lock_guard<std::mutex> lock(all_frag_row_offsets_mutex_);
65  if (all_frag_row_offsets_.empty()) {
66  all_frag_row_offsets_.resize(query_infos_.front().info.fragments.size() + 1);
67  for (size_t i = 1; i <= query_infos_.front().info.fragments.size(); ++i) {
70  query_infos_.front().info.fragments[i - 1].getNumTuples();
71  }
72  }
73  return all_frag_row_offsets_;
74 }
75 
77  std::vector<size_t> outer_table_fragment_ids) {
78  std::lock_guard<std::mutex> lock(reduce_mutex_);
79  if (!needs_skip_result(device_results)) {
80  all_fragment_results_.emplace_back(std::move(device_results),
81  outer_table_fragment_ids);
82  }
83 }
84 
85 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>&
87  return all_fragment_results_;
88 }
89 
90 void ExecutionKernel::run(Executor* executor,
91  const size_t thread_idx,
92  SharedKernelContext& shared_context) {
93  DEBUG_TIMER("ExecutionKernel::run");
94  INJECT_TIMER(kernel_run);
95  try {
96  runImpl(executor, thread_idx, shared_context);
97  } catch (const OutOfHostMemory& e) {
99  } catch (const std::bad_alloc& e) {
101  } catch (const OutOfRenderMemory& e) {
103  } catch (const OutOfMemory& e) {
104  throw QueryExecutionError(
106  e.what(),
110  } catch (const ColumnarConversionNotSupported& e) {
112  } catch (const TooManyLiterals& e) {
114  } catch (const StringConstInResultSet& e) {
116  } catch (const QueryExecutionError& e) {
117  throw e;
118  }
119 }
120 
121 void ExecutionKernel::runImpl(Executor* executor,
122  const size_t thread_idx,
123  SharedKernelContext& shared_context) {
124  CHECK(executor);
125  const auto memory_level = chosen_device_type == ExecutorDeviceType::GPU
128  CHECK_GE(frag_list.size(), size_t(1));
129  // frag_list[0].table_id is how we tell which query we are running for UNION ALL.
130  const int outer_table_id = ra_exe_unit_.union_all
131  ? frag_list[0].table_id
132  : ra_exe_unit_.input_descs[0].getTableId();
133  CHECK_EQ(frag_list[0].table_id, outer_table_id);
134  const auto& outer_tab_frag_ids = frag_list[0].fragment_ids;
135 
138 
139  auto catalog = executor->getCatalog();
140  CHECK(catalog);
141 
142  auto data_mgr = executor->getDataMgr();
143 
144  // need to own them while query executes
145  auto chunk_iterators_ptr = std::make_shared<std::list<ChunkIter>>();
146  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks;
147  std::unique_ptr<std::lock_guard<std::mutex>> gpu_lock;
148  std::unique_ptr<CudaAllocator> device_allocator;
150  gpu_lock.reset(
151  new std::lock_guard<std::mutex>(executor->gpu_exec_mutex_[chosen_device_id]));
152  device_allocator = std::make_unique<CudaAllocator>(data_mgr, chosen_device_id);
153  }
154  FetchResult fetch_result;
155  try {
156  std::map<int, const TableFragments*> all_tables_fragments;
158  all_tables_fragments, ra_exe_unit_, shared_context.getQueryInfos());
159 
160  fetch_result = ra_exe_unit_.union_all
161  ? executor->fetchUnionChunks(column_fetcher,
162  ra_exe_unit_,
164  memory_level,
165  all_tables_fragments,
166  frag_list,
167  *catalog,
168  *chunk_iterators_ptr,
169  chunks,
170  device_allocator.get(),
171  thread_idx,
173  : executor->fetchChunks(column_fetcher,
174  ra_exe_unit_,
176  memory_level,
177  all_tables_fragments,
178  frag_list,
179  *catalog,
180  *chunk_iterators_ptr,
181  chunks,
182  device_allocator.get(),
183  thread_idx,
185  if (fetch_result.num_rows.empty()) {
186  return;
187  }
189  !shared_context.dynamic_watchdog_set.test_and_set(std::memory_order_acquire)) {
192  LOG(INFO) << "Dynamic Watchdog budget: CPU: "
194  << std::to_string(cycle_budget) << " cycles";
195  }
196  } catch (const OutOfMemory&) {
197  throw QueryExecutionError(
203  return;
204  }
205 
207  if (ra_exe_unit_.input_descs.size() > 1) {
208  throw std::runtime_error("Joins not supported through external execution");
209  }
210  const auto query = serialize_to_sql(&ra_exe_unit_, catalog);
211  GroupByAndAggregate group_by_and_aggregate(executor,
213  ra_exe_unit_,
214  shared_context.getQueryInfos(),
215  executor->row_set_mem_owner_,
216  std::nullopt);
217  const auto query_mem_desc =
218  group_by_and_aggregate.initQueryMemoryDescriptor(false, 0, 8, nullptr, false);
220  query,
221  fetch_result,
222  executor->plan_state_.get(),
226  executor});
227  shared_context.addDeviceResults(std::move(device_results_), outer_tab_frag_ids);
228  return;
229  }
230  const CompilationResult& compilation_result = query_comp_desc.getCompilationResult();
231  std::unique_ptr<QueryExecutionContext> query_exe_context_owned;
232  const bool do_render = render_info_ && render_info_->isPotentialInSituRender();
233 
234  int64_t total_num_input_rows{-1};
237  total_num_input_rows = 0;
238  std::for_each(fetch_result.num_rows.begin(),
239  fetch_result.num_rows.end(),
240  [&total_num_input_rows](const std::vector<int64_t>& frag_row_count) {
241  total_num_input_rows = std::accumulate(frag_row_count.begin(),
242  frag_row_count.end(),
243  total_num_input_rows);
244  });
245  VLOG(2) << "total_num_input_rows=" << total_num_input_rows;
246  // TODO(adb): we may want to take this early out for all queries, but we are most
247  // likely to see this query pattern on the kernel per fragment path (e.g. with HAVING
248  // 0=1)
249  if (total_num_input_rows == 0) {
250  return;
251  }
252 
254  total_num_input_rows *= ra_exe_unit_.input_descs.size();
255  }
256  }
257 
259  try {
260  query_exe_context_owned =
262  executor,
266  total_num_input_rows,
267  fetch_result.col_buffers,
268  fetch_result.frag_offsets,
269  executor->getRowSetMemoryOwner(),
270  compilation_result.output_columnar,
272  thread_idx,
273  do_render ? render_info_ : nullptr);
274  } catch (const OutOfHostMemory& e) {
276  }
277  }
278  QueryExecutionContext* query_exe_context{query_exe_context_owned.get()};
279  CHECK(query_exe_context);
280  int32_t err{0};
281  uint32_t start_rowid{0};
282  if (rowid_lookup_key >= 0) {
283  if (!frag_list.empty()) {
284  const auto& all_frag_row_offsets = shared_context.getFragOffsets();
285  start_rowid = rowid_lookup_key -
286  all_frag_row_offsets[frag_list.begin()->fragment_ids.front()];
287  }
288  }
289 
290  if (ra_exe_unit_.groupby_exprs.empty()) {
291  err = executor->executePlanWithoutGroupBy(ra_exe_unit_,
292  compilation_result,
297  fetch_result.col_buffers,
298  query_exe_context,
299  fetch_result.num_rows,
300  fetch_result.frag_offsets,
301  data_mgr,
303  start_rowid,
304  ra_exe_unit_.input_descs.size(),
306  do_render ? render_info_ : nullptr);
307  } else {
308  if (ra_exe_unit_.union_all) {
309  VLOG(1) << "outer_table_id=" << outer_table_id
310  << " ra_exe_unit_.scan_limit=" << ra_exe_unit_.scan_limit;
311  }
312  err = executor->executePlanWithGroupBy(ra_exe_unit_,
313  compilation_result,
317  fetch_result.col_buffers,
318  outer_tab_frag_ids,
319  query_exe_context,
320  fetch_result.num_rows,
321  fetch_result.frag_offsets,
322  data_mgr,
324  outer_table_id,
326  start_rowid,
327  ra_exe_unit_.input_descs.size(),
329  do_render ? render_info_ : nullptr);
330  }
331  if (device_results_) {
332  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_to_hold;
333  for (const auto& chunk : chunks) {
334  if (need_to_hold_chunk(chunk.get(), ra_exe_unit_)) {
335  chunks_to_hold.push_back(chunk);
336  }
337  }
338  device_results_->holdChunks(chunks_to_hold);
339  device_results_->holdChunkIterators(chunk_iterators_ptr);
340  } else {
341  VLOG(1) << "null device_results.";
342  }
343  if (err) {
344  throw QueryExecutionError(err);
345  }
346  shared_context.addDeviceResults(std::move(device_results_), outer_tab_frag_ids);
347 }
std::vector< Analyzer::Expr * > target_exprs
virtual const char * what() const noexceptfinal
Definition: checked_alloc.h:39
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
const std::vector< uint64_t > & getFragOffsets()
static const int max_gpu_count
Definition: Execute.h:1052
const std::optional< bool > union_all
#define LOG(tag)
Definition: Logger.h:200
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
std::vector< uint64_t > all_frag_row_offsets_
const int64_t rowid_lookup_key
std::mutex all_frag_row_offsets_mutex_
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
const ExecutorDeviceType chosen_device_type
#define CHECK_GE(x, y)
Definition: Logger.h:219
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1144
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
RenderInfo * render_info_
#define CHECK_GT(x, y)
Definition: Logger.h:218
std::string to_string(char const *&&v)
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1145
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1143
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:54
bool needs_skip_result(const ResultSetPtr &res)
ExecutorType executor_type
#define INJECT_TIMER(DESC)
Definition: measure.h:93
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1138
const JoinQualsPerNestingLevel join_quals
const QueryMemoryDescriptor & query_mem_desc
std::unique_ptr< QueryExecutionContext > getQueryExecutionContext(const RelAlgExecutionUnit &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner >, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *) const
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
const QueryCompilationDescriptor & query_comp_desc
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1135
bool need_to_hold_chunk(const Chunk_NS::Chunk *chunk, const RelAlgExecutionUnit &ra_exe_unit)
QueryDescriptionType getQueryDescriptionType() const
RUNTIME_EXPORT uint64_t dynamic_watchdog_init(unsigned ms_budget)
#define CHECK_LT(x, y)
Definition: Logger.h:216
void runImpl(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > all_fragment_results_
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
const FragmentsList frag_list
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
bool query_has_inner_join(const RelAlgExecutionUnit &ra_exe_unit)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:64
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::mutex reduce_mutex_
SQLTypeInfo columnType
unsigned dynamic_watchdog_time_limit
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
const std::vector< InputTableInfo > & query_infos_
ExecutionUnitSql serialize_to_sql(const RelAlgExecutionUnit *ra_exe_unit, const Catalog_Namespace::Catalog *catalog)
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1139
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
#define VLOG(n)
Definition: Logger.h:300
const ColumnFetcher & column_fetcher