OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThreadController.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22 #include <future>
23 #include <thread>
24 #include <type_traits>
25 #include <vector>
26 
27 namespace ThreadController_NS {
28 
29 template <typename FutureReturnType>
30 struct FutureGetter {
31  using FutureGetterFunction = void (*)(FutureReturnType);
33 };
34 
35 template <>
36 struct FutureGetter<void> {};
37 
38 template <typename FutureReturnType = void>
40  public:
41  SimpleThreadController() = delete;
42  template <bool future_return_void = std::is_void<FutureReturnType>::value>
43  SimpleThreadController(const int max_threads, std::enable_if_t<future_return_void>* = 0)
44  : max_threads_(max_threads) {}
45  template <bool future_return_void = std::is_void<FutureReturnType>::value>
46  SimpleThreadController(const int max_threads,
47  const FutureGetter<FutureReturnType> future_getter,
48  std::enable_if_t<!future_return_void>* = 0)
49  : max_threads_(max_threads), future_getter_(future_getter) {}
51  virtual int getThreadCount() const { return threads_.size(); }
52  virtual int getRunningThreadCount() const { return threads_.size(); }
53  virtual void checkThreadsStatus() {
54  while (getRunningThreadCount() >= max_threads_) {
55  std::this_thread::yield();
56  threads_.erase(std::remove_if(threads_.begin(),
57  threads_.end(),
58  [this](auto& th) {
59  using namespace std::chrono_literals;
60  if (th.wait_for(0ns) == std::future_status::ready) {
61  this->get_future(th);
62  return true;
63  } else {
64  return false;
65  }
66  }),
67  threads_.end());
68  }
69  }
70  template <typename FuncType, typename... Args>
71  void startThread(FuncType&& func, Args&&... args) {
72  threads_.emplace_back(std::async(std::launch::async, func, args...));
73  }
74  virtual void finish() {
75  for (auto& t : threads_) {
76  get_future(t);
77  }
78  threads_.clear();
79  }
80 
81  protected:
82  template <bool future_return_void = std::is_void<FutureReturnType>::value>
83  void get_future(std::future<FutureReturnType>& future,
84  std::enable_if_t<future_return_void>* = 0) {
85  future.get();
86  }
87  template <bool future_return_void = std::is_void<FutureReturnType>::value>
88  void get_future(std::future<FutureReturnType>& future,
89  std::enable_if_t<!future_return_void>* = 0) {
90  future_getter_.get(future.get());
91  }
92 
93  private:
94  const int max_threads_;
96  std::vector<std::future<FutureReturnType>> threads_;
97 };
98 
99 template <typename FutureReturnType = void>
100 class SimpleRunningThreadController : public SimpleThreadController<FutureReturnType> {
101  public:
103  template <bool future_return_void = std::is_void<FutureReturnType>::value>
104  SimpleRunningThreadController(const int max_threads,
105  std::enable_if_t<future_return_void>* = 0)
106  : SimpleThreadController<FutureReturnType>(max_threads), n_running_threads_(0) {}
107  template <bool future_return_void = std::is_void<FutureReturnType>::value>
108  SimpleRunningThreadController(const int max_threads,
109  const FutureGetter<FutureReturnType> future_getter,
110  std::enable_if_t<!future_return_void>* = 0)
111  : SimpleThreadController<FutureReturnType>(max_threads, future_getter)
112  , n_running_threads_(0) {}
114  int notify_thread_is_completed() { return --n_running_threads_; }
115  int getRunningThreadCount() const override { return n_running_threads_; }
116  void checkThreadsStatus() override {
118  }
119  template <typename FuncType, typename... Args>
120  int startThread(FuncType&& func, Args&&... args) {
122  return ++n_running_threads_;
123  }
124 
125  private:
126  // SimpleRunningThreadController consumers must EXPLICITLY update number
127  // of running threads using notify_thread_is_completed member function
128  std::atomic<int> n_running_threads_;
129 };
130 
131 } // namespace ThreadController_NS
std::vector< std::future< FutureReturnType > > threads_
const FutureGetter< FutureReturnType > future_getter_
SimpleThreadController(const int max_threads, const FutureGetter< FutureReturnType > future_getter, std::enable_if_t<!future_return_void > *=0)
void startThread(FuncType &&func, Args &&...args)
future< Result > async(Fn &&fn, Args &&...args)
SimpleRunningThreadController(const int max_threads, const FutureGetter< FutureReturnType > future_getter, std::enable_if_t<!future_return_void > *=0)
SimpleRunningThreadController(const int max_threads, std::enable_if_t< future_return_void > *=0)
void(*)(FutureReturnType) FutureGetterFunction
void get_future(std::future< FutureReturnType > &future, std::enable_if_t<!future_return_void > *=0)
int startThread(FuncType &&func, Args &&...args)
void get_future(std::future< FutureReturnType > &future, std::enable_if_t< future_return_void > *=0)
SimpleThreadController(const int max_threads, std::enable_if_t< future_return_void > *=0)