OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutionKernel.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Logger/Logger.h"
22 
23 #include "Shared/threading.h"
24 
25 #ifdef HAVE_TBB
26 #include "tbb/enumerable_thread_specific.h"
27 #endif
28 
30  public:
31  SharedKernelContext(const std::vector<InputTableInfo>& query_infos)
32  : query_infos_(query_infos)
33 #ifdef HAVE_TBB
34  , task_group_(nullptr)
35 #endif
36  {
37  }
38 
39  const std::vector<uint64_t>& getFragOffsets();
40 
41  void addDeviceResults(ResultSetPtr&& device_results,
42  std::vector<size_t> outer_table_fragment_ids);
43 
44  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& getFragmentResults();
45 
46  const std::vector<InputTableInfo>& getQueryInfos() const { return query_infos_; }
47 
48  std::atomic_flag dynamic_watchdog_set = ATOMIC_FLAG_INIT;
49 
50 #ifdef HAVE_TBB
51  auto getThreadPool() { return task_group_; }
52  void setThreadPool(threading::task_group* tg) { task_group_ = tg; }
53  auto& getTlsExecutionContext() { return tls_execution_context_; }
54 #endif // HAVE_TBB
55 
56  private:
57  std::mutex reduce_mutex_;
58  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> all_fragment_results_;
59 
60  std::vector<uint64_t> all_frag_row_offsets_;
62  const std::vector<InputTableInfo>& query_infos_;
64 
65 #ifdef HAVE_TBB
66  threading::task_group* task_group_;
67  tbb::enumerable_thread_specific<std::unique_ptr<QueryExecutionContext>>
68  tls_execution_context_;
69 #endif // HAVE_TBB
70 };
71 
73  public:
76  int chosen_device_id,
77  const ExecutionOptions& eo,
81  const FragmentsList& frag_list,
83  RenderInfo* render_info,
84  const int64_t rowid_lookup_key)
85  : ra_exe_unit_(ra_exe_unit)
86  , chosen_device_type(chosen_device_type)
87  , chosen_device_id(chosen_device_id)
88  , eo(eo)
89  , column_fetcher(column_fetcher)
90  , query_comp_desc(query_comp_desc)
91  , query_mem_desc(query_mem_desc)
92  , frag_list(frag_list)
93  , kernel_dispatch_mode(kernel_dispatch_mode)
94  , render_info_(render_info)
95  , rowid_lookup_key(rowid_lookup_key) {}
96 
97  void run(Executor* executor,
98  const size_t thread_idx,
99  SharedKernelContext& shared_context);
100 
102 
103  private:
113  const int64_t rowid_lookup_key;
114 
116 
117  void runImpl(Executor* executor,
118  const size_t thread_idx,
119  SharedKernelContext& shared_context);
120 
121  friend class KernelSubtask;
122 };
123 
124 #ifdef HAVE_TBB
125 class KernelSubtask {
126  public:
127  KernelSubtask(ExecutionKernel& k,
128  SharedKernelContext& shared_context,
129  std::shared_ptr<FetchResult> fetch_result,
130  std::shared_ptr<std::list<ChunkIter>> chunk_iterators,
131  int64_t total_num_input_rows,
132  size_t start_rowid,
133  size_t num_rows_to_process,
134  size_t thread_idx)
135  : kernel_(k)
136  , shared_context_(shared_context)
137  , fetch_result_(fetch_result)
138  , chunk_iterators_(chunk_iterators)
139  , total_num_input_rows_(total_num_input_rows)
140  , start_rowid_(start_rowid)
141  , num_rows_to_process_(num_rows_to_process)
142  , thread_idx_(thread_idx) {}
143 
144  void run(Executor* executor);
145 
146  private:
147  void runImpl(Executor* executor);
148 
149  ExecutionKernel& kernel_;
150  SharedKernelContext& shared_context_;
151  std::shared_ptr<FetchResult> fetch_result_;
152  std::shared_ptr<std::list<ChunkIter>> chunk_iterators_;
153  int64_t total_num_input_rows_;
154  size_t start_rowid_;
155  size_t num_rows_to_process_;
156  size_t thread_idx_;
157 };
158 #endif // HAVE_TBB
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
const std::vector< uint64_t > & getFragOffsets()
ExecutorDeviceType
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
std::vector< uint64_t > all_frag_row_offsets_
const int64_t rowid_lookup_key
std::mutex all_frag_row_offsets_mutex_
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
const ExecutorDeviceType chosen_device_type
std::shared_ptr< ResultSet > ResultSetPtr
RenderInfo * render_info_
Container for compilation results and assorted options for a single execution unit.
std::vector< FragmentsPerTable > FragmentsList
ExecutorDispatchMode
const RegisteredQueryHint query_hint_
const QueryMemoryDescriptor & query_mem_desc
const QueryCompilationDescriptor & query_comp_desc
void runImpl(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > all_fragment_results_
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
const FragmentsList frag_list
ExecutionKernel(const RelAlgExecutionUnit &ra_exe_unit, const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_list, const ExecutorDispatchMode kernel_dispatch_mode, RenderInfo *render_info, const int64_t rowid_lookup_key)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::mutex reduce_mutex_
static bool run
const std::vector< InputTableInfo > & query_infos_
friend class KernelSubtask
SharedKernelContext(const std::vector< InputTableInfo > &query_infos)
const ColumnFetcher & column_fetcher