Apollo  v5.5.0
Open source self driving car software
concurrent_object_pool.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 #pragma once
17 
18 #include <deque>
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 #include <vector>
24 
26 
27 #define PERCEPTION_BASE_DISABLE_POOL
28 namespace apollo {
29 namespace perception {
30 namespace base {
31 
32 static const size_t kPoolDefaultExtendNum = 10;
33 static const size_t kPoolDefaultSize = 100;
34 
35 // @brief default initializer used in concurrent object pool
36 template <class T>
38  void operator()(T* t) const {}
39 };
40 // @brief concurrent object pool with dynamic size
41 template <class ObjectType, size_t N = kPoolDefaultSize,
42  class Initializer = ObjectPoolDefaultInitializer<ObjectType>>
43 class ConcurrentObjectPool : public BaseObjectPool<ObjectType> {
44  public:
45  // using ObjectTypePtr = typename BaseObjectPool<ObjectType>::ObjectTypePtr;
47  // @brief Only allow accessing from global instance
49  static ConcurrentObjectPool pool(N);
50  return pool;
51  }
52  // @brief overrided function to get object smart pointer
53  std::shared_ptr<ObjectType> Get() override {
54 // TODO(All): remove conditional build
55 #ifndef PERCEPTION_BASE_DISABLE_POOL
56  ObjectType* ptr = nullptr;
57  {
58  std::lock_guard<std::mutex> lock(mutex_);
59  if (queue_.empty()) {
60  Add(1 + kPoolDefaultExtendNum);
61  }
62  ptr = queue_.front();
63  queue_.pop();
64  }
65  // For efficiency consideration, initialization should be invoked
66  // after releasing the mutex
67  kInitializer(ptr);
68  return std::shared_ptr<ObjectType>(ptr, [&](ObjectType* obj_ptr) {
69  std::lock_guard<std::mutex> lock(mutex_);
70  queue_.push(obj_ptr);
71  });
72 #else
73  return std::shared_ptr<ObjectType>(new ObjectType);
74 #endif
75  }
76  // @brief overrided function to get batch of smart pointers
77  // @params[IN] num: batch number
78  // @params[OUT] data: vector container to store the pointers
79  void BatchGet(size_t num,
80  std::vector<std::shared_ptr<ObjectType>>* data) override {
81 #ifndef PERCEPTION_BASE_DISABLE_POOL
82  std::vector<ObjectType*> buffer(num, nullptr);
83  {
84  std::lock_guard<std::mutex> lock(mutex_);
85  if (queue_.size() < num) {
86  Add(num - queue_.size() + kPoolDefaultExtendNum);
87  }
88  for (size_t i = 0; i < num; ++i) {
89  buffer[i] = queue_.front();
90  queue_.pop();
91  }
92  }
93  // For efficiency consideration, initialization should be invoked
94  // after releasing the mutex
95  for (size_t i = 0; i < num; ++i) {
96  kInitializer(buffer[i]);
97  data->emplace_back(
98  std::shared_ptr<ObjectType>(buffer[i], [&](ObjectType* obj_ptr) {
99  std::lock_guard<std::mutex> lock(mutex_);
100  queue_.push(obj_ptr);
101  }));
102  }
103 #else
104  for (size_t i = 0; i < num; ++i) {
105  data->emplace_back(std::shared_ptr<ObjectType>(new ObjectType));
106  }
107 #endif
108  }
109  // @brief overrided function to get batch of smart pointers
110  // @params[IN] num: batch number
111  // @params[IN] is_front: indicating insert to front or back of the list
112  // @params[OUT] data: list container to store the pointers
113  void BatchGet(size_t num, bool is_front,
114  std::list<std::shared_ptr<ObjectType>>* data) override {
115 #ifndef PERCEPTION_BASE_DISABLE_POOL
116  std::vector<ObjectType*> buffer(num, nullptr);
117  {
118  std::lock_guard<std::mutex> lock(mutex_);
119  if (queue_.size() < num) {
120  Add(num - queue_.size() + kPoolDefaultExtendNum);
121  }
122  for (size_t i = 0; i < num; ++i) {
123  buffer[i] = queue_.front();
124  queue_.pop();
125  }
126  }
127  // For efficiency consideration, initialization should be invoked
128  // after releasing the mutex
129  for (size_t i = 0; i < num; ++i) {
130  kInitializer(buffer[i]);
131  is_front ? data->emplace_front(std::shared_ptr<ObjectType>(
132  buffer[i],
133  [&](ObjectType* obj_ptr) {
134  std::lock_guard<std::mutex> lock(mutex_);
135  queue_.push(obj_ptr);
136  }))
137  : data->emplace_back(std::shared_ptr<ObjectType>(
138  buffer[i], [&](ObjectType* obj_ptr) {
139  std::lock_guard<std::mutex> lock(mutex_);
140  queue_.push(obj_ptr);
141  }));
142  }
143 #else
144  for (size_t i = 0; i < num; ++i) {
145  is_front
146  ? data->emplace_front(std::shared_ptr<ObjectType>(new ObjectType))
147  : data->emplace_back(std::shared_ptr<ObjectType>(new ObjectType));
148  }
149 #endif
150  }
151  // @brief overrided function to get batch of smart pointers
152  // @params[IN] num: batch number
153  // @params[IN] is_front: indicating insert to front or back of the deque
154  // @params[OUT] data: deque container to store the pointers
155  void BatchGet(size_t num, bool is_front,
156  std::deque<std::shared_ptr<ObjectType>>* data) override {
157 #ifndef PERCEPTION_BASE_DISABLE_POOL
158  std::vector<ObjectType*> buffer(num, nullptr);
159  {
160  std::lock_guard<std::mutex> lock(mutex_);
161  if (queue_.size() < num) {
162  Add(num - queue_.size() + kPoolDefaultExtendNum);
163  }
164  for (size_t i = 0; i < num; ++i) {
165  buffer[i] = queue_.front();
166  queue_.pop();
167  }
168  }
169  for (size_t i = 0; i < num; ++i) {
170  kInitializer(buffer[i]);
171  is_front ? data->emplace_front(std::shared_ptr<ObjectType>(
172  buffer[i],
173  [&](ObjectType* obj_ptr) {
174  std::lock_guard<std::mutex> lock(mutex_);
175  queue_.push(obj_ptr);
176  }))
177  : data->emplace_back(std::shared_ptr<ObjectType>(
178  buffer[i], [&](ObjectType* obj_ptr) {
179  std::lock_guard<std::mutex> lock(mutex_);
180  queue_.push(obj_ptr);
181  }));
182  }
183 #else
184  for (size_t i = 0; i < num; ++i) {
185  is_front
186  ? data->emplace_front(std::shared_ptr<ObjectType>(new ObjectType))
187  : data->emplace_back(std::shared_ptr<ObjectType>(new ObjectType));
188  }
189 #endif
190  }
191 #ifndef PERCEPTION_BASE_DISABLE_POOL
192  // @brief overrided function to set capacity
193  void set_capacity(size_t capacity) override {
194  std::lock_guard<std::mutex> lock(mutex_);
195  if (capacity_ < capacity) {
196  Add(capacity - capacity_);
197  }
198  }
199  // @brief get remained object number
200  size_t RemainedNum() override { return queue_.size(); }
201 #endif
202  // @brief destructor to release the cached memory
204  if (cache_) {
205  delete[] cache_;
206  cache_ = nullptr;
207  }
208  for (auto& ptr : extended_cache_) {
209  delete ptr;
210  }
211  extended_cache_.clear();
212  }
213 
214  protected:
215 // @brief add num objects, should add lock before invoke this function
216 #ifndef PERCEPTION_BASE_DISABLE_POOL
217  void Add(size_t num) {
218  for (size_t i = 0; i < num; ++i) {
219  ObjectType* ptr = new ObjectType;
220  extended_cache_.push_back(ptr);
221  queue_.push(ptr);
222  }
223  capacity_ = kDefaultCacheSize + extended_cache_.size();
224  }
225 #endif
226  // @brief default constructor
227  explicit ConcurrentObjectPool(const size_t default_size)
228  : kDefaultCacheSize(default_size) {
229 #ifndef PERCEPTION_BASE_DISABLE_POOL
230  cache_ = new ObjectType[kDefaultCacheSize];
231  for (size_t i = 0; i < kDefaultCacheSize; ++i) {
232  queue_.push(&cache_[i]);
233  }
234  capacity_ = kDefaultCacheSize;
235 #endif
236  }
237  std::mutex mutex_;
238  std::queue<ObjectType*> queue_;
239  // @brief point to a continuous memory of default pool size
240  ObjectType* cache_ = nullptr;
241  const size_t kDefaultCacheSize;
242  // @brief list to store extended memory, not as efficient
243  std::list<ObjectType*> extended_cache_;
244  static const Initializer kInitializer;
245 };
246 
247 } // namespace base
248 } // namespace perception
249 } // namespace apollo
~ConcurrentObjectPool() override
Definition: concurrent_object_pool.h:203
Definition: blob.h:72
ConcurrentObjectPool(const size_t default_size)
Definition: concurrent_object_pool.h:227
Definition: concurrent_object_pool.h:37
static ConcurrentObjectPool & Instance()
Definition: concurrent_object_pool.h:48
void BatchGet(size_t num, bool is_front, std::deque< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:155
static const Initializer kInitializer
Definition: concurrent_object_pool.h:244
std::queue< ObjectType * > queue_
Definition: concurrent_object_pool.h:238
std::mutex mutex_
Definition: concurrent_object_pool.h:237
const size_t kDefaultCacheSize
Definition: concurrent_object_pool.h:241
Definition: concurrent_object_pool.h:43
void BatchGet(size_t num, bool is_front, std::list< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:113
Definition: object_pool.h:28
std::list< ObjectType * > extended_cache_
Definition: concurrent_object_pool.h:243
void operator()(T *t) const
Definition: concurrent_object_pool.h:38
std::shared_ptr< ObjectType > Get() override
Definition: concurrent_object_pool.h:53
ObjectType
Definition: object_types.h:26
void BatchGet(size_t num, std::vector< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:79