OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
threading_std.h
Go to the documentation of this file.
1 #include <cassert>
2 #include <cstddef>
3 #include <future>
4 #include <type_traits>
5 #include <vector>
6 #include "thread_count.h"
7 
8 #ifndef THREADING_STD_LAUNCH
9 #define THREADING_STD_LAUNCH async // async or deferred
10 #endif
11 
12 namespace threading_common {
13 class split {};
14 
16 // class static_partitioner;
17 // class affinity_partitioner;
18 
20 template <typename Value>
22  public:
24 
27 
29  using size_type = std::size_t;
30 
32  blocked_range(Value begin_, Value end_ /*TODO , size_type grainsize_=1*/)
33  : my_end(end_)
34  , my_begin(begin_) //, my_grainsize(grainsize_)
35  {
36  // assert( my_grainsize>0 && "grainsize must be positive" );
37  }
38 
40  const_iterator begin() const { return my_begin; }
41 
43  const_iterator end() const { return my_end; }
44 
46 
47  size_type size() const {
48  assert(!(end() < begin()) && "size() unspecified if end()<begin()");
49  return size_type(my_end - my_begin);
50  }
51 
53  size_type grainsize() const { return 1 /*my_grainsize*/; }
54 
55  //------------------------------------------------------------------------
56  // Methods that implement Range concept
57  //------------------------------------------------------------------------
58 
60  bool empty() const { return !(my_begin < my_end); }
61 
63 
64  bool is_divisible() const { return /*TODO my_grainsize<*/ size(); }
65 
67 
70  : my_end(r.my_end)
71  , my_begin(do_split(r, split()))
72  // TODO , my_grainsize(r.my_grainsize)
73  {
74  // only comparison 'less than' is required from values of blocked_range objects
75  assert(!(my_begin < r.my_end) && !(r.my_end < my_begin) &&
76  "blocked_range has been split incorrectly");
77  }
78 
79  private:
84  // TODO size_type my_grainsize;
85 
88  assert(r.is_divisible() && "cannot split blocked_range that is not divisible");
89  Value middle = r.my_begin + (r.my_end - r.my_begin) / 2u;
90  r.my_end = middle;
91  return middle;
92  }
93 };
94 } // namespace threading_common
95 
96 namespace threading_std {
97 
98 using std::future;
99 using namespace threading_common;
101 
102 template <typename Fn,
103  typename... Args,
104  typename Result = std::result_of_t<Fn && (Args && ...)>>
105 future<Result> async(Fn&& fn, Args&&... args) {
106  return std::async(launch, std::forward<Fn>(fn), std::forward<Args>(args)...);
107 }
108 
109 class task_group {
110  std::vector<future<void>> threads_;
111 
112  public:
113  template <typename F>
114  void run(F&& f) {
115  threads_.emplace_back(async(std::forward<F>(f)));
116  }
117  void cancel() { /*not implemented*/
118  }
119  void wait() { // TODO task_group_status ?
120  for (auto& child : this->threads_) {
121  child.wait();
122  }
123  }
124 }; // class task_group
125 
127 
128 // template<typename Range, typename Body, typename Partitioner = auto_partitioner>
129 // void parallel_for( const Range& range, const Body& body, const Partitioner &p =
130 // Partitioner());
131 
132 template <typename Int, typename Body, typename Partitioner = auto_partitioner>
134  const Body& body,
135  const Partitioner& p = Partitioner()) {
136  const Int worker_count = cpu_threads();
137  std::vector<std::future<void>> worker_threads;
138  worker_threads.reserve(worker_count);
139 
140  for (Int i = 0,
141  start_entry = range.begin(),
142  stop_entry = range.end(),
143  stride = (range.size() + worker_count - 1) / worker_count;
144  i < worker_count && start_entry < stop_entry;
145  ++i, start_entry += stride) {
146  const auto end_entry = std::min(start_entry + stride, stop_entry);
147  // TODO grainsize?
148  worker_threads.emplace_back(
149  std::async(launch, body, blocked_range<Int>(start_entry, end_entry)));
150  }
151  for (auto& child : worker_threads) {
152  child.wait();
153  }
154 }
155 
158 template <typename Index, typename Function, typename Partitioner = auto_partitioner>
159 void parallel_for(Index first,
160  Index last,
161  const Function& f,
162  const Partitioner& p = Partitioner()) {
163  parallel_for(
164  blocked_range<Index>(first, last),
165  [&f](const blocked_range<Index>& r) {
166  //#pragma ivdep
167  //#pragma omp simd
168  for (auto i = r.begin(), e = r.end(); i < e; i++) {
169  f(i);
170  }
171  },
172  p);
173 }
174 
176 
177 template <typename Int,
178  typename Value,
179  typename RealBody,
180  typename Reduction,
181  typename Partitioner = auto_partitioner>
183  const Value& identity,
184  const RealBody& real_body,
185  const Reduction& reduction,
186  const Partitioner& p = Partitioner()) {
187  const size_t worker_count = cpu_threads();
188  std::vector<std::future<Value>> worker_threads;
189  worker_threads.reserve(worker_count);
190 
191  for (Int i = 0,
192  start_entry = range.begin(),
193  stop_entry = range.end(),
194  stride = (range.size() + worker_count - 1) / worker_count;
195  i < worker_count && start_entry < stop_entry;
196  ++i, start_entry += stride) {
197  const auto end_entry = std::min(start_entry + stride, stop_entry);
198  // TODO grainsize?
199  worker_threads.emplace_back(std::async(
200  launch, real_body, blocked_range<Int>(start_entry, end_entry), Value{}));
201  }
202  Value v = identity;
203  for (auto& child : worker_threads) {
204  v = reduction(v, child.get());
205  }
206 
207  return v;
208 }
209 
210 } // namespace threading_std
size_type size() const
Size of the range.
Definition: threading_std.h:47
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
Parallel iteration over range with default partitioner.
constexpr auto launch
static Value do_split(blocked_range &r, split)
Auxiliary function used by the splitting constructor.
Definition: threading_std.h:87
std::vector< future< void > > threads_
bool empty() const
True if range is empty.
Definition: threading_std.h:60
future< Result > async(Fn &&fn, Args &&...args)
A range over which to iterate.
Definition: threading_std.h:21
future< Result > async(Fn &&fn, Args &&...args)
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
size_type grainsize() const
The grain size for this range.
Definition: threading_std.h:53
blocked_range(Value begin_, Value end_)
Construct range over half-open interval [begin,end), with the given grainsize.
Definition: threading_std.h:32
blocked_range(blocked_range &r, split)
Split range.
Definition: threading_std.h:69
#define THREADING_STD_LAUNCH
Definition: threading_std.h:9
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
const_iterator end() const
One past last value in range.
Definition: threading_std.h:43
const_iterator begin() const
Beginning of range.
Definition: threading_std.h:40
int cpu_threads()
Definition: thread_count.h:25
bool is_divisible() const
True if range is divisible.
Definition: threading_std.h:64
std::size_t size_type
Type for size of a range.
Definition: threading_std.h:29