OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutionKernel.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Logger/Logger.h"
22 
23 #include "Shared/threading.h"
24 
25 #ifdef HAVE_TBB
26 #include "tbb/enumerable_thread_specific.h"
27 #endif
28 
30  public:
31  SharedKernelContext(const std::vector<InputTableInfo>& query_infos)
32  : query_infos_(query_infos)
33 #ifdef HAVE_TBB
34  , task_group_(nullptr)
35 #endif
36  {
37  }
38 
39  const std::vector<uint64_t>& getFragOffsets();
40 
41  void addDeviceResults(ResultSetPtr&& device_results,
42  std::vector<size_t> outer_table_fragment_ids);
43 
44  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& getFragmentResults();
45 
46  const std::vector<InputTableInfo>& getQueryInfos() const {
47  return query_infos_;
48  }
49 
50  void setNumAllocatedThreads(size_t num_threads) {
51  num_allocated_threads_ = num_threads;
52  }
53 
56  }
57 
58  std::atomic_flag dynamic_watchdog_set = ATOMIC_FLAG_INIT;
59 
60 #ifdef HAVE_TBB
61  auto getThreadPool() {
62  return task_group_;
63  }
64  void setThreadPool(threading::task_group* tg) {
65  task_group_ = tg;
66  }
67  auto& getTlsExecutionContext() {
68  return tls_execution_context_;
69  }
70 #endif // HAVE_TBB
71 
72  private:
73  std::mutex reduce_mutex_;
74  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> all_fragment_results_;
75 
76  std::vector<uint64_t> all_frag_row_offsets_;
78  const std::vector<InputTableInfo>& query_infos_;
80  // the # threads to execute the query (kernel) w/ a value one by default (means serial
81  // query execution). After finishing the compilation of the kernel, we will set it to a
82  // proper value based on the query's status
84 
85 #ifdef HAVE_TBB
86  threading::task_group* task_group_;
87  tbb::enumerable_thread_specific<std::unique_ptr<QueryExecutionContext>>
88  tls_execution_context_;
89 #endif // HAVE_TBB
90 };
91 
93  public:
96  int chosen_device_id,
97  const ExecutionOptions& eo,
101  const FragmentsList& frag_list,
103  RenderInfo* render_info,
104  const int64_t rowid_lookup_key)
105  : ra_exe_unit_(ra_exe_unit)
106  , chosen_device_type(chosen_device_type)
107  , chosen_device_id(chosen_device_id)
108  , eo(eo)
109  , column_fetcher(column_fetcher)
110  , query_comp_desc(query_comp_desc)
111  , query_mem_desc(query_mem_desc)
112  , frag_list(frag_list)
113  , kernel_dispatch_mode(kernel_dispatch_mode)
114  , render_info_(render_info)
115  , rowid_lookup_key(rowid_lookup_key) {}
116 
117  void run(Executor* executor,
118  const size_t thread_idx,
119  SharedKernelContext& shared_context);
120 
122  int32_t get_chosen_device_id() const { return chosen_device_id; }
124 
125  private:
135  const int64_t rowid_lookup_key;
136 
138 
139  void runImpl(Executor* executor,
140  const size_t thread_idx,
141  SharedKernelContext& shared_context);
142 
143  friend class KernelSubtask;
144 };
145 
146 #ifdef HAVE_TBB
147 class KernelSubtask {
148  public:
149  KernelSubtask(ExecutionKernel& k,
150  SharedKernelContext& shared_context,
151  std::shared_ptr<FetchResult> fetch_result,
152  std::shared_ptr<std::list<ChunkIter>> chunk_iterators,
153  int64_t total_num_input_rows,
154  size_t start_rowid,
155  size_t num_rows_to_process,
156  size_t thread_idx)
157  : kernel_(k)
158  , shared_context_(shared_context)
159  , fetch_result_(fetch_result)
160  , chunk_iterators_(chunk_iterators)
161  , total_num_input_rows_(total_num_input_rows)
162  , start_rowid_(start_rowid)
163  , num_rows_to_process_(num_rows_to_process)
164  , thread_idx_(thread_idx) {}
165 
166  void run(Executor* executor);
167 
168  private:
169  void runImpl(Executor* executor);
170 
171  ExecutionKernel& kernel_;
172  SharedKernelContext& shared_context_;
173  std::shared_ptr<FetchResult> fetch_result_;
174  std::shared_ptr<std::list<ChunkIter>> chunk_iterators_;
175  int64_t total_num_input_rows_;
176  size_t start_rowid_;
177  size_t num_rows_to_process_;
178  size_t thread_idx_;
179 };
180 #endif // HAVE_TBB
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
const std::vector< uint64_t > & getFragOffsets()
int32_t get_chosen_device_id() const
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
std::vector< uint64_t > all_frag_row_offsets_
const int64_t rowid_lookup_key
std::mutex all_frag_row_offsets_mutex_
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
const ExecutorDeviceType chosen_device_type
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
std::shared_ptr< ResultSet > ResultSetPtr
RenderInfo * render_info_
Container for compilation results and assorted options for a single execution unit.
std::vector< FragmentsPerTable > FragmentsList
ExecutorDeviceType
ExecutorDispatchMode
FragmentsList get_fragment_list() const
const RegisteredQueryHint query_hint_
const QueryMemoryDescriptor & query_mem_desc
const QueryCompilationDescriptor & query_comp_desc
void runImpl(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > all_fragment_results_
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
const FragmentsList frag_list
ExecutionKernel(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, RenderInfo *render_info, const int64_t rowid_lookup_key)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::mutex reduce_mutex_
void setNumAllocatedThreads(size_t num_threads)
static bool run
const std::vector< InputTableInfo > & query_infos_
size_t getNumAllocatedThreads()
friend class KernelSubtask
SharedKernelContext(const std::vector< InputTableInfo > &query_infos)
const ColumnFetcher & column_fetcher