OmniSciDB  c07336695a
ThreadController.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <future>
22 #include <thread>
23 #include <type_traits>
24 #include <vector>
25 
27 
28 template <typename FutureReturnType>
29 struct FutureGetter {
30  using FutureGetterFunction = void (*)(FutureReturnType);
32 };
33 
34 template <>
35 struct FutureGetter<void> {};
36 
37 template <typename FutureReturnType = void>
39  public:
40  SimpleThreadController() = delete;
41  template <bool future_return_void = std::is_void<FutureReturnType>::value>
42  SimpleThreadController(const int max_threads, std::enable_if_t<future_return_void>* = 0)
43  : max_threads_(max_threads) {}
44  template <bool future_return_void = std::is_void<FutureReturnType>::value>
45  SimpleThreadController(const int max_threads,
46  const FutureGetter<FutureReturnType> future_getter,
47  std::enable_if_t<!future_return_void>* = 0)
48  : max_threads_(max_threads), future_getter_(future_getter) {}
50  virtual int getThreadCount() const { return threads_.size(); }
51  virtual int getRunningThreadCount() const { return threads_.size(); }
52  virtual void checkThreadsStatus() {
53  while (getRunningThreadCount() >= max_threads_) {
54  std::this_thread::yield();
55  threads_.erase(std::remove_if(threads_.begin(),
56  threads_.end(),
57  [this](auto& th) {
58  using namespace std::chrono_literals;
59  if (th.wait_for(0ns) == std::future_status::ready) {
60  this->get_future(th);
61  return true;
62  } else {
63  return false;
64  }
65  }),
66  threads_.end());
67  }
68  }
69  template <typename FuncType, typename... Args>
70  void startThread(FuncType&& func, Args&&... args) {
71  threads_.emplace_back(std::async(std::launch::async, func, args...));
72  }
73  virtual void finish() {
74  for (auto& t : threads_) {
75  get_future(t);
76  }
77  threads_.clear();
78  }
79 
80  protected:
81  template <bool future_return_void = std::is_void<FutureReturnType>::value>
82  void get_future(std::future<FutureReturnType>& future,
83  std::enable_if_t<future_return_void>* = 0) {
84  future.get();
85  }
86  template <bool future_return_void = std::is_void<FutureReturnType>::value>
87  void get_future(std::future<FutureReturnType>& future,
88  std::enable_if_t<!future_return_void>* = 0) {
89  future_getter_.get(future.get());
90  }
91 
92  private:
93  const int max_threads_;
94  const FutureGetter<FutureReturnType> future_getter_{};
95  std::vector<std::future<FutureReturnType>> threads_;
96 };
97 
98 template <typename FutureReturnType = void>
99 class SimpleRunningThreadController : public SimpleThreadController<FutureReturnType> {
100  public:
102  template <bool future_return_void = std::is_void<FutureReturnType>::value>
103  SimpleRunningThreadController(const int max_threads,
104  std::enable_if_t<future_return_void>* = 0)
105  : SimpleThreadController<FutureReturnType>(max_threads), n_running_threads_(0) {}
106  template <bool future_return_void = std::is_void<FutureReturnType>::value>
107  SimpleRunningThreadController(const int max_threads,
108  const FutureGetter<FutureReturnType> future_getter,
109  std::enable_if_t<!future_return_void>* = 0)
110  : SimpleThreadController<FutureReturnType>(max_threads, future_getter)
111  , n_running_threads_(0) {}
113  int notify_thread_is_completed() { return --n_running_threads_; }
114  int getRunningThreadCount() const override { return n_running_threads_; }
115  void checkThreadsStatus() override {
117  }
118  template <typename FuncType, typename... Args>
119  int startThread(FuncType&& func, Args&&... args) {
121  return ++n_running_threads_;
122  }
123 
124  private:
125  // SimpleRunningThreadController consumers must EXPLICITLY update number
126  // of running threads using notify_thread_is_completed member function
127  std::atomic<int> n_running_threads_;
128 };
129 
130 } // namespace ThreadController_NS
std::vector< std::future< FutureReturnType > > threads_
SimpleThreadController(const int max_threads, const FutureGetter< FutureReturnType > future_getter, std::enable_if_t<!future_return_void > *=0)
int startThread(FuncType &&func, Args &&... args)
SimpleRunningThreadController(const int max_threads, const FutureGetter< FutureReturnType > future_getter, std::enable_if_t<!future_return_void > *=0)
SimpleRunningThreadController(const int max_threads, std::enable_if_t< future_return_void > *=0)
void(*)(FutureReturnType) FutureGetterFunction
void startThread(FuncType &&func, Args &&... args)
void get_future(std::future< FutureReturnType > &future, std::enable_if_t<!future_return_void > *=0)
void get_future(std::future< FutureReturnType > &future, std::enable_if_t< future_return_void > *=0)
SimpleThreadController(const int max_threads, std::enable_if_t< future_return_void > *=0)