OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ExecutionDispatch.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnFetcher.h"
20 #include "DynamicWatchdog.h"
21 #include "ErrorHandling.h"
22 #include "Execute.h"
23 
25 
26 #include <numeric>
27 
29 
30 namespace {
31 
33  return !res || res->definitelyHasNoRows();
34 }
35 
36 // The result set of `ra_exe_unit` needs to hold a reference to `chunk` if its
37 // column is part of the target expressions, result set iteration needs it alive.
39  const RelAlgExecutionUnit& ra_exe_unit) {
40  CHECK(chunk->get_column_desc());
41  const auto chunk_ti = chunk->get_column_desc()->columnType;
42  if (chunk_ti.is_array() ||
43  (chunk_ti.is_string() && chunk_ti.get_compression() == kENCODING_NONE)) {
44  for (const auto target_expr : ra_exe_unit.target_exprs) {
45  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
46  if (col_var && col_var->get_column_id() == chunk->get_column_desc()->columnId &&
47  col_var->get_table_id() == chunk->get_column_desc()->tableId) {
48  return true;
49  }
50  }
51  }
52  return false;
53 }
54 
55 inline bool query_has_inner_join(const RelAlgExecutionUnit& ra_exe_unit) {
56  return (std::count_if(ra_exe_unit.join_quals.begin(),
57  ra_exe_unit.join_quals.end(),
58  [](const auto& join_condition) {
59  return join_condition.type == JoinType::INNER;
60  }) > 0);
61 }
62 
63 } // namespace
64 
66  const ExecutorDeviceType chosen_device_type,
67  int chosen_device_id,
68  const ExecutionOptions& eo,
69  const ColumnFetcher& column_fetcher,
70  const QueryCompilationDescriptor& query_comp_desc,
72  const FragmentsList& frag_list,
73  const ExecutorDispatchMode kernel_dispatch_mode,
74  const int64_t rowid_lookup_key) {
75  const auto memory_level = chosen_device_type == ExecutorDeviceType::GPU
78  const int outer_table_id = ra_exe_unit_.input_descs[0].getTableId();
79  CHECK_GE(frag_list.size(), size_t(1));
80  CHECK_EQ(frag_list[0].table_id, outer_table_id);
81  const auto& outer_tab_frag_ids = frag_list[0].fragment_ids;
82  CHECK_GE(chosen_device_id, 0);
83  CHECK_LT(chosen_device_id, max_gpu_count);
84  // need to own them while query executes
85  auto chunk_iterators_ptr = std::make_shared<std::list<ChunkIter>>();
86  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks;
87  std::unique_ptr<std::lock_guard<std::mutex>> gpu_lock;
88  if (chosen_device_type == ExecutorDeviceType::GPU) {
89  gpu_lock.reset(
90  new std::lock_guard<std::mutex>(executor_->gpu_exec_mutex_[chosen_device_id]));
91  }
92  FetchResult fetch_result;
93  try {
94  std::map<int, const TableFragments*> all_tables_fragments;
96  all_tables_fragments, ra_exe_unit_, query_infos_);
97 
98  fetch_result = executor_->fetchChunks(column_fetcher,
100  chosen_device_id,
101  memory_level,
102  all_tables_fragments,
103  frag_list,
104  cat_,
105  *chunk_iterators_ptr,
106  chunks);
107  if (fetch_result.num_rows.empty()) {
108  return;
109  }
110  if (eo.with_dynamic_watchdog &&
111  !dynamic_watchdog_set_.test_and_set(std::memory_order_acquire)) {
113  auto cycle_budget = dynamic_watchdog_init(eo.dynamic_watchdog_time_limit);
114  LOG(INFO) << "Dynamic Watchdog budget: CPU: "
116  << std::to_string(cycle_budget) << " cycles";
117  }
118  } catch (const OutOfMemory&) {
119  throw QueryExecutionError(
123  query_mem_desc.getQueryDescriptionType(),
124  kernel_dispatch_mode == ExecutorDispatchMode::MultifragmentKernel});
125  return;
126  }
127 
128  const CompilationResult& compilation_result = query_comp_desc.getCompilationResult();
129  std::unique_ptr<QueryExecutionContext> query_exe_context_owned;
130  const bool do_render = render_info_ && render_info_->isPotentialInSituRender();
131 
132  int64_t total_num_input_rows{-1};
133  if (kernel_dispatch_mode == ExecutorDispatchMode::KernelPerFragment &&
135  total_num_input_rows = 0;
136  std::for_each(fetch_result.num_rows.begin(),
137  fetch_result.num_rows.end(),
138  [&total_num_input_rows](const std::vector<int64_t>& frag_row_count) {
139  total_num_input_rows = std::accumulate(frag_row_count.begin(),
140  frag_row_count.end(),
141  total_num_input_rows);
142  });
143  // TODO(adb): we may want to take this early out for all queries, but we are most
144  // likely to see this query pattern on the kernel per fragment path (e.g. with HAVING
145  // 0=1)
146  if (total_num_input_rows == 0) {
147  return;
148  }
149 
151  total_num_input_rows *= ra_exe_unit_.input_descs.size();
152  }
153  }
154 
155  try {
156  query_exe_context_owned =
157  query_mem_desc.getQueryExecutionContext(ra_exe_unit_,
158  executor_,
159  chosen_device_type,
160  kernel_dispatch_mode,
161  chosen_device_id,
162  total_num_input_rows,
163  fetch_result.col_buffers,
164  fetch_result.frag_offsets,
166  compilation_result.output_columnar,
167  query_mem_desc.sortOnGpu(),
168  do_render ? render_info_ : nullptr);
169  } catch (const OutOfHostMemory& e) {
171  }
172  QueryExecutionContext* query_exe_context{query_exe_context_owned.get()};
173  CHECK(query_exe_context);
174  int32_t err{0};
175  uint32_t start_rowid{0};
176  if (rowid_lookup_key >= 0) {
177  if (!frag_list.empty()) {
178  const auto& all_frag_row_offsets = getFragOffsets();
179  start_rowid = rowid_lookup_key -
180  all_frag_row_offsets[frag_list.begin()->fragment_ids.front()];
181  }
182  }
183 
184  ResultSetPtr device_results;
185  if (ra_exe_unit_.groupby_exprs.empty()) {
186  err = executor_->executePlanWithoutGroupBy(ra_exe_unit_,
187  compilation_result,
188  query_comp_desc.hoistLiterals(),
189  device_results,
191  chosen_device_type,
192  fetch_result.col_buffers,
193  query_exe_context,
194  fetch_result.num_rows,
195  fetch_result.frag_offsets,
196  &cat_.getDataMgr(),
197  chosen_device_id,
198  start_rowid,
199  ra_exe_unit_.input_descs.size(),
200  do_render ? render_info_ : nullptr);
201  } else {
202  err = executor_->executePlanWithGroupBy(ra_exe_unit_,
203  compilation_result,
204  query_comp_desc.hoistLiterals(),
205  device_results,
206  chosen_device_type,
207  fetch_result.col_buffers,
208  outer_tab_frag_ids,
209  query_exe_context,
210  fetch_result.num_rows,
211  fetch_result.frag_offsets,
212  &cat_.getDataMgr(),
213  chosen_device_id,
215  start_rowid,
216  ra_exe_unit_.input_descs.size(),
217  do_render ? render_info_ : nullptr);
218  }
219  if (device_results) {
220  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_to_hold;
221  for (const auto chunk : chunks) {
222  if (need_to_hold_chunk(chunk.get(), ra_exe_unit_)) {
223  chunks_to_hold.push_back(chunk);
224  }
225  }
226  device_results->holdChunks(chunks_to_hold);
227  device_results->holdChunkIterators(chunk_iterators_ptr);
228  }
229  if (err) {
230  throw QueryExecutionError(err);
231  }
232  {
233  std::lock_guard<std::mutex> lock(reduce_mutex_);
234  if (!needs_skip_result(device_results)) {
235  all_fragment_results_.emplace_back(std::move(device_results), outer_tab_frag_ids);
236  }
237  }
238 }
239 
241  Executor* executor,
242  const RelAlgExecutionUnit& ra_exe_unit,
243  const std::vector<InputTableInfo>& query_infos,
244  const Catalog_Namespace::Catalog& cat,
245  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
246  RenderInfo* render_info)
247  : executor_(executor)
248  , ra_exe_unit_(ra_exe_unit)
249  , query_infos_(query_infos)
250  , cat_(cat)
251  , row_set_mem_owner_(row_set_mem_owner)
252  , render_info_(render_info) {
253  all_fragment_results_.reserve(query_infos_.front().info.fragments.size());
254 }
255 
256 std::tuple<QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned>
257 Executor::ExecutionDispatch::compile(const size_t max_groups_buffer_entry_guess,
258  const int8_t crt_min_byte_width,
259  const CompilationOptions& co,
260  const ExecutionOptions& eo,
261  const ColumnFetcher& column_fetcher,
262  const bool has_cardinality_estimation) {
263  auto query_comp_desc = std::make_unique<QueryCompilationDescriptor>();
264  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc;
265 
266  switch (co.device_type_) {
268  query_mem_desc = query_comp_desc->compile(max_groups_buffer_entry_guess,
269  crt_min_byte_width,
270  has_cardinality_estimation,
271  ra_exe_unit_,
272  query_infos_,
273  column_fetcher,
274  co,
275  eo,
276  render_info_,
277  executor_);
278  break;
279  }
281  query_mem_desc = query_comp_desc->compile(max_groups_buffer_entry_guess,
282  crt_min_byte_width,
283  has_cardinality_estimation,
284  ra_exe_unit_,
285  query_infos_,
286  column_fetcher,
287  co,
288  eo,
289  render_info_,
290  executor_);
291  break;
292  }
293  default:
294  UNREACHABLE();
295  }
296 
297  return std::make_tuple(std::move(query_comp_desc), std::move(query_mem_desc));
298 }
299 
301  int chosen_device_id,
302  const ExecutionOptions& eo,
303  const ColumnFetcher& column_fetcher,
304  const QueryCompilationDescriptor& query_comp_desc,
306  const FragmentsList& frag_list,
307  const ExecutorDispatchMode kernel_dispatch_mode,
308  const int64_t rowid_lookup_key) {
309  try {
310  runImpl(chosen_device_type,
311  chosen_device_id,
312  eo,
313  column_fetcher,
314  query_comp_desc,
315  query_mem_desc,
316  frag_list,
317  kernel_dispatch_mode,
318  rowid_lookup_key);
319  } catch (const std::bad_alloc& e) {
320  throw QueryExecutionError(ERR_OUT_OF_CPU_MEM, e.what());
321  } catch (const OutOfHostMemory& e) {
322  throw QueryExecutionError(ERR_OUT_OF_CPU_MEM, e.what());
323  } catch (const OutOfRenderMemory& e) {
325  } catch (const OutOfMemory& e) {
326  throw QueryExecutionError(
328  e.what(),
330  query_mem_desc.getQueryDescriptionType(),
331  kernel_dispatch_mode == ExecutorDispatchMode::MultifragmentKernel});
332  } catch (const ColumnarConversionNotSupported& e) {
334  } catch (const TooManyLiterals& e) {
336  } catch (const SringConstInResultSet& e) {
338  } catch (const QueryExecutionError& e) {
339  throw e;
340  }
341 }
342 
344  return ra_exe_unit_;
345 }
346 
347 const std::vector<uint64_t>& Executor::ExecutionDispatch::getFragOffsets() const {
348  std::lock_guard<std::mutex> lock(all_frag_row_offsets_mutex_);
349  if (all_frag_row_offsets_.empty()) {
350  all_frag_row_offsets_.resize(query_infos_.front().info.fragments.size() + 1);
351  for (size_t i = 1; i <= query_infos_.front().info.fragments.size(); ++i) {
352  all_frag_row_offsets_[i] =
353  all_frag_row_offsets_[i - 1] +
354  query_infos_.front().info.fragments[i - 1].getNumTuples();
355  }
356  }
357  return all_frag_row_offsets_;
358 }
359 
360 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>&
362  return all_fragment_results_;
363 }
std::vector< std::vector< int64_t > > num_rows
Definition: Execute.h:529
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file,::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:106
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::vector< Analyzer::Expr * > target_exprs
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const ColumnDescriptor * get_column_desc() const
Definition: Chunk.h:52
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< std::vector< uint64_t > > frag_offsets
Definition: Execute.h:530
ExecutorDeviceType
std::unique_ptr< QueryExecutionContext > getQueryExecutionContext(const RelAlgExecutionUnit &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner >, const bool output_columnar, const bool sort_on_gpu, RenderInfo *) const
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
static const int max_gpu_count
Definition: Execute.h:991
#define LOG(tag)
Definition: Logger.h:185
bool needs_skip_result(const ResultSetPtr &res)
#define UNREACHABLE()
Definition: Logger.h:234
static std::mutex reduce_mutex_
Definition: Execute.h:549
#define CHECK_GE(x, y)
Definition: Logger.h:203
const RelAlgExecutionUnit & getExecutionUnit() const
std::shared_ptr< ResultSet > ResultSetPtr
static const int32_t ERR_TOO_MANY_LITERALS
Definition: Execute.h:1044
std::vector< std::vector< const int8_t * > > col_buffers
Definition: Execute.h:528
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
ExecutionDispatch(Executor *executor, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const Catalog_Namespace::Catalog &cat, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, RenderInfo *render_info)
const std::vector< InputDescriptor > input_descs
#define CHECK_GT(x, y)
Definition: Logger.h:202
Container for compilation results and assorted options for a single execution unit.
std::vector< FragmentsPerTable > FragmentsList
std::string to_string(char const *&&v)
ExecutorDispatchMode
static const int32_t ERR_STRING_CONST_IN_RESULTSET
Definition: Execute.h:1045
static const int32_t ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED
Definition: Execute.h:1043
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
CHECK(cgen_state)
const Catalog_Namespace::Catalog & cat_
Definition: Execute.h:542
const bool with_dynamic_watchdog
static const int32_t ERR_OUT_OF_RENDER_MEM
Definition: Execute.h:1037
const JoinQualsPerNestingLevel join_quals
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:989
const std::vector< uint64_t > & getFragOffsets() const
const std::vector< InputTableInfo > & query_infos_
Definition: Execute.h:541
const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: Execute.h:545
uint64_t dynamic_watchdog_init(unsigned ms_budget)
static const int32_t ERR_OUT_OF_GPU_MEM
Definition: Execute.h:1034
ExecutorDeviceType device_type_
QueryDescriptionType getQueryDescriptionType() const
#define CHECK_LT(x, y)
Definition: Logger.h:200
void runImpl(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
bool query_has_inner_join(const RelAlgExecutionUnit &ra_exe_unit)
bool need_to_hold_chunk(const Chunk_NS::Chunk *chunk, const RelAlgExecutionUnit &ra_exe_unit)
bool isPotentialInSituRender() const
Definition: RenderInfo.cpp:61
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > all_fragment_results_
Definition: Execute.h:547
std::atomic_flag dynamic_watchdog_set_
Definition: Execute.h:548
SQLTypeInfo columnType
RenderInfo * render_info_
Definition: Execute.h:546
const unsigned dynamic_watchdog_time_limit
static const int32_t ERR_OUT_OF_CPU_MEM
Definition: Execute.h:1038
Descriptor for the fragments required for a query.
static void computeAllTablesFragments(std::map< int, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
const RelAlgExecutionUnit & ra_exe_unit_
Definition: Execute.h:540