Apollo  v5.5.0
Open source self driving car software
ctpl_stl.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2019 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 /*********************************************************
18  *
19  * Copyright (C) 2014 by Vitaliy Vitsentiy
20  *
21  * Licensed under the Apache License, Version 2.0 (the "License");
22  * you may not use this file except in compliance with the License.
23  * You may obtain a copy of the License at
24  *
25  * http://www.apache.org/licenses/LICENSE-2.0
26  *
27  * Unless required by applicable law or agreed to in writing, software
28  * distributed under the License is distributed on an "AS IS" BASIS,
29  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30  * See the License for the specific language governing permissions and
31  * limitations under the License.
32  *
33  *********************************************************/
34 
35 #ifndef __ctpl_stl_thread_pool_H__
36 #define __ctpl_stl_thread_pool_H__
37 
38 #include <atomic>
39 #include <exception>
40 #include <functional>
41 #include <future>
42 #include <memory>
43 #include <mutex>
44 #include <queue>
45 #include <thread>
46 #include <utility>
47 #include <vector>
48 
49 // thread pool to run user's functors with signature
50 // ret func(int id, other_params)
51 // where id is the index of the thread that runs the functor
52 // ret is some return type
53 
54 namespace ctpl {
55 
56 namespace detail {
57 template <typename T>
58 class Queue {
59  public:
60  bool push(T const &value) {
61  std::unique_lock<std::mutex> lock(this->mutex);
62  this->q.push(value);
63  return true;
64  }
65  // deletes the retrieved element, do not use for non integral types
66  bool pop(T &v) { // NOLINT
67  std::unique_lock<std::mutex> lock(this->mutex);
68  if (this->q.empty()) {
69  return false;
70  }
71  v = this->q.front();
72  this->q.pop();
73  return true;
74  }
75  bool empty() {
76  std::unique_lock<std::mutex> lock(this->mutex);
77  return this->q.empty();
78  }
79 
80  private:
81  std::queue<T> q;
82  std::mutex mutex;
83 };
84 } // namespace detail
85 
86 class thread_pool {
87  public:
88  thread_pool() { this->init(); }
89  explicit thread_pool(int nThreads) {
90  this->init();
91  this->resize(nThreads);
92  }
93 
94  // the destructor waits for all the functions in the queue to be finished
95  ~thread_pool() { this->stop(true); }
96 
97  // get the number of running threads in the pool
98  int size() { return static_cast<int>(this->threads.size()); }
99 
100  // number of idle threads
101  int n_idle() { return this->nWaiting; }
102  std::thread &get_thread(int i) { return *this->threads[i]; }
103 
104  // change the number of threads in the pool
105  // should be called from one thread, otherwise be careful to not interleave,
106  // also with this->stop()
107  // nThreads must be >= 0
108  void resize(int nThreads) {
109  if (!this->isStop && !this->isDone) {
110  int oldNThreads = static_cast<int>(this->threads.size());
111  if (oldNThreads <= nThreads) { // if the number of threads is increased
112  this->threads.resize(nThreads);
113  this->flags.resize(nThreads);
114 
115  for (int i = oldNThreads; i < nThreads; ++i) {
116  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
117  this->set_thread(i);
118  }
119  } else { // the number of threads is decreased
120  for (int i = oldNThreads - 1; i >= nThreads; --i) {
121  *this->flags[i] = true; // this thread will finish
122  this->threads[i]->detach();
123  }
124  {
125  // stop the detached threads that were waiting
126  std::unique_lock<std::mutex> lock(this->mutex);
127  this->cv.notify_all();
128  }
129  this->threads.resize(
130  nThreads); // safe to delete because the threads are detached
131  this->flags.resize(nThreads); // safe to delete because the threads
132  // have copies of shared_ptr of the
133  // flags, not originals
134  }
135  }
136  }
137 
138  // empty the queue
139  void clear_queue() {
140  std::function<void(int id)> *_f;
141  while (this->q.pop(_f)) delete _f; // empty the queue
142  }
143 
144  // pops a functional wrapper to the original function
145  std::function<void(int)> pop() {
146  std::function<void(int id)> *_f = nullptr;
147  this->q.pop(_f);
148  std::unique_ptr<std::function<void(int id)>> func(
149  _f); // at return, delete the function even if an exception occurred
150  std::function<void(int)> f;
151  if (_f) {
152  f = *_f;
153  }
154  return f;
155  }
156 
157  // wait for all computing threads to finish and stop all threads
158  // may be called asynchronously to not pause the calling thread while waiting
159  // if isWait == true, all the functions in the queue are run, otherwise the
160  // queue is cleared without running the functions
161  void stop(bool isWait = false) {
162  if (!isWait) {
163  if (this->isStop) {
164  return;
165  }
166  this->isStop = true;
167  for (int i = 0, n = this->size(); i < n; ++i) {
168  *this->flags[i] = true; // command the threads to stop
169  }
170  this->clear_queue(); // empty the queue
171  } else {
172  if (this->isDone || this->isStop) {
173  return;
174  }
175  this->isDone = true; // give the waiting threads a command to finish
176  }
177  {
178  std::unique_lock<std::mutex> lock(this->mutex);
179  this->cv.notify_all(); // stop all waiting threads
180  }
181  for (int i = 0; i < static_cast<int>(this->threads.size());
182  ++i) { // wait for the computing threads to finish
183  if (this->threads[i]->joinable()) {
184  this->threads[i]->join();
185  }
186  }
187  // if there were no threads in the pool but some functors in the queue, the
188  // functors are not deleted by the threads
189  // therefore delete them here
190  this->clear_queue();
191  this->threads.clear();
192  this->flags.clear();
193  }
194 
195  template <typename F, typename... Rest>
196  auto push(F &&f, Rest &&... rest) -> std::future<decltype(f(0, rest...))> {
197  auto pck =
198  std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
199  std::bind(std::forward<F>(f), std::placeholders::_1,
200  std::forward<Rest>(rest)...));
201  auto _f = new std::function<void(int id)>([pck](int id) { (*pck)(id); });
202  this->q.push(_f);
203  std::unique_lock<std::mutex> lock(this->mutex);
204  this->cv.notify_one();
205  return pck->get_future();
206  }
207 
208  // run the user's function that excepts argument int - id of the running
209  // thread. returned value is templatized
210  // operator returns std::future, where the user can get the result and rethrow
211  // the catched exceptins
212  template <typename F>
213  auto push(F &&f) -> std::future<decltype(f(0))> {
214  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
215  std::forward<F>(f));
216  auto _f = new std::function<void(int id)>([pck](int id) { (*pck)(id); });
217  this->q.push(_f);
218  std::unique_lock<std::mutex> lock(this->mutex);
219  this->cv.notify_one();
220  return pck->get_future();
221  }
222 
223  private:
224  // deleted
225  thread_pool(const thread_pool &); // = delete;
226  thread_pool(thread_pool &&); // = delete;
227  thread_pool &operator=(const thread_pool &); // = delete;
228  thread_pool &operator=(thread_pool &&); // = delete;
229 
230  void set_thread(int i) {
231  std::shared_ptr<std::atomic<bool>> flag(
232  this->flags[i]); // a copy of the shared ptr to the flag
233  auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() {
234  std::atomic<bool> &_flag = *flag;
235  std::function<void(int id)> *_f;
236  bool isPop = this->q.pop(_f);
237  while (true) {
238  while (isPop) { // if there is anything in the queue
239  std::unique_ptr<std::function<void(int id)>> func(
240  _f); // at return, delete the function even if an exception
241  // occurred
242  (*_f)(i);
243  if (_flag)
244  return; // the thread is wanted to stop, return even if the queue
245  // is not empty yet
246  else
247  isPop = this->q.pop(_f);
248  }
249  // the queue is empty here, wait for the next command
250  std::unique_lock<std::mutex> lock(this->mutex);
251  ++this->nWaiting;
252  this->cv.wait(lock, [this, &_f, &isPop, &_flag]() {
253  isPop = this->q.pop(_f);
254  return isPop || this->isDone || _flag;
255  });
256  --this->nWaiting;
257  if (!isPop)
258  return; // if the queue is empty and this->isDone == true or *flag
259  // then return
260  }
261  };
262  this->threads[i].reset(
263  new std::thread(f)); // compiler may not support std::make_unique()
264  }
265 
266  void init() {
267  this->nWaiting = 0;
268  this->isStop = false;
269  this->isDone = false;
270  }
271 
272  std::vector<std::unique_ptr<std::thread>> threads;
273  std::vector<std::shared_ptr<std::atomic<bool>>> flags;
275  std::atomic<bool> isDone;
276  std::atomic<bool> isStop;
277  std::atomic<int> nWaiting; // how many threads are waiting
278 
279  std::mutex mutex;
280  std::condition_variable cv;
281 };
282 } // namespace ctpl
283 
284 #endif // __ctpl_stl_thread_pool_H__
bool pop(T &v)
Definition: ctpl_stl.h:66
bool push(T const &value)
Definition: ctpl_stl.h:60
int n_idle()
Definition: ctpl_stl.h:101
thread_pool(int nThreads)
Definition: ctpl_stl.h:89
Definition: ctpl.h:59
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl_stl.h:196
void resize(int nThreads)
Definition: ctpl_stl.h:108
void clear_queue()
Definition: ctpl_stl.h:139
Definition: ctpl_stl.h:58
bool empty()
Definition: ctpl_stl.h:75
std::function< void(int)> pop()
Definition: ctpl_stl.h:145
thread_pool()
Definition: ctpl_stl.h:88
std::thread & get_thread(int i)
Definition: ctpl_stl.h:102
void stop(bool isWait=false)
Definition: ctpl_stl.h:161
int size()
Definition: ctpl_stl.h:98
Definition: ctpl.h:61
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: ctpl_stl.h:213
~thread_pool()
Definition: ctpl_stl.h:95