Apollo  v5.5.0
Open source self driving car software
msg_buffer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 #pragma once
17 
18 #include <cfloat>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
24 #include "boost/circular_buffer.hpp"
25 #include "cyber/cyber.h"
26 #include "gflags/gflags.h"
27 
28 namespace apollo {
29 namespace perception {
30 namespace onboard {
31 
32 DECLARE_int32(obs_msg_buffer_size);
33 DECLARE_double(obs_buffer_match_precision);
34 
35 template <class T>
36 class MsgBuffer {
37  public:
38  typedef std::shared_ptr<T const> ConstPtr;
39  typedef std::pair<double, ConstPtr> ObjectPair;
40 
41  public:
42  MsgBuffer() : buffer_queue_(FLAGS_obs_msg_buffer_size) {}
43  ~MsgBuffer() = default;
44 
45  MsgBuffer(const MsgBuffer&) = delete;
46  MsgBuffer operator=(const MsgBuffer&) = delete;
47 
48  void Init(const std::string& channel, const std::string& name);
49 
50  // get nearest message
51  int LookupNearest(double timestamp, ConstPtr* msg);
52  // get latest message
53  int LookupLatest(ConstPtr* msg);
54  // get messages in (timestamp-period, timestamp+period)
55  int LookupPeriod(double timestamp, double period,
56  std::vector<ObjectPair>* msgs);
57 
58  private:
59  void MsgCallback(const ConstPtr& msg);
60 
61  private:
62  std::string node_name_;
63  std::unique_ptr<cyber::Node> node_;
64  std::shared_ptr<cyber::Reader<T>> msg_subscriber_;
65  std::mutex buffer_mutex_;
66 
67  bool init_ = false;
68  boost::circular_buffer<ObjectPair> buffer_queue_;
69 };
70 
71 template <class T>
72 void MsgBuffer<T>::Init(const std::string& channel, const std::string& name) {
73  int index = static_cast<int>(name.find_last_of('/'));
74  if (index != -1) {
75  node_name_ = name.substr(index + 1) + "_subscriber";
76  } else {
77  node_name_ = name + "_subscriber";
78  }
79  node_.reset(apollo::cyber::CreateNode(node_name_).release());
80 
81  std::function<void(const ConstPtr&)> register_call =
82  std::bind(&MsgBuffer<T>::MsgCallback, this, std::placeholders::_1);
83  msg_subscriber_ = node_->CreateReader<T>(channel, register_call);
84 
85  std::lock_guard<std::mutex> lock(buffer_mutex_);
86  buffer_queue_.set_capacity(FLAGS_obs_msg_buffer_size);
87  init_ = true;
88 }
89 
90 template <class T>
91 void MsgBuffer<T>::MsgCallback(const ConstPtr& msg) {
92  std::lock_guard<std::mutex> lock(buffer_mutex_);
93  double timestamp = msg->measurement_time();
94  buffer_queue_.push_back(std::make_pair(timestamp, msg));
95 }
96 
97 template <class T>
98 int MsgBuffer<T>::LookupNearest(double timestamp, ConstPtr* msg) {
99  std::lock_guard<std::mutex> lock(buffer_mutex_);
100  if (!init_) {
101  AERROR << "msg buffer is uninitialized.";
102  return false;
103  }
104  if (buffer_queue_.empty()) {
105  AERROR << "msg buffer is empty.";
106  return false;
107  }
108  if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
109  timestamp) {
110  AERROR << "Your timestamp (" << timestamp
111  << ") is earlier than the oldest timestamp ("
112  << buffer_queue_.front().first << ").";
113  return false;
114  }
115  if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
116  timestamp) {
117  AERROR << "Your timestamp (" << timestamp
118  << ") is newer than the latest timestamp ("
119  << buffer_queue_.back().first << ").";
120  return false;
121  }
122 
123  // loop to find nearest
124  double distance = DBL_MAX;
125  int idx = static_cast<int>(buffer_queue_.size()) - 1;
126  for (; idx >= 0; --idx) {
127  double temp_distance = fabs(timestamp - buffer_queue_[idx].first);
128  if (temp_distance >= distance) {
129  break;
130  }
131  distance = temp_distance;
132  }
133  *msg = buffer_queue_[idx + 1].second;
134 
135  return true;
136 }
137 
138 template <class T>
140  std::lock_guard<std::mutex> lock(buffer_mutex_);
141  if (!init_) {
142  AERROR << "Message buffer is uninitialized.";
143  return false;
144  }
145  if (buffer_queue_.empty()) {
146  AERROR << "Message buffer is empty.";
147  return false;
148  }
149  *msg = buffer_queue_.back().second;
150  return true;
151 }
152 
153 template <class T>
154 int MsgBuffer<T>::LookupPeriod(const double timestamp, const double period,
155  std::vector<ObjectPair>* msgs) {
156  std::lock_guard<std::mutex> lock(buffer_mutex_);
157  if (!init_) {
158  AERROR << "Message buffer is uninitialized.";
159  return false;
160  }
161  if (buffer_queue_.empty()) {
162  AERROR << "Message buffer is empty.";
163  return false;
164  }
165  if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
166  timestamp) {
167  AERROR << "Your timestamp (" << timestamp << ") is earlier than the oldest "
168  << "timestamp (" << buffer_queue_.front().first << ").";
169  return false;
170  }
171  if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
172  timestamp) {
173  AERROR << "Your timestamp (" << timestamp << ") is newer than the latest "
174  << "timestamp (" << buffer_queue_.back().first << ").";
175  return false;
176  }
177 
178  const double lower_timestamp = timestamp - period;
179  const double upper_timestamp = timestamp + period;
180  for (const auto& obj_pair : buffer_queue_) {
181  if (obj_pair.first < lower_timestamp) {
182  continue;
183  }
184  if (obj_pair.first > upper_timestamp) {
185  break;
186  }
187  msgs->push_back(obj_pair);
188  }
189 
190  return true;
191 }
192 
193 } // namespace onboard
194 } // namespace perception
195 } // namespace apollo
Definition: blob.h:72
Definition: msg_buffer.h:36
int LookupNearest(double timestamp, ConstPtr *msg)
Definition: msg_buffer.h:98
DECLARE_double(obs_buffer_match_precision)
std::shared_ptr< T const > ConstPtr
Definition: msg_buffer.h:38
void Init(const std::string &channel, const std::string &name)
Definition: msg_buffer.h:72
MsgBuffer operator=(const MsgBuffer &)=delete
std::pair< double, ConstPtr > ObjectPair
Definition: msg_buffer.h:39
int LookupPeriod(double timestamp, double period, std::vector< ObjectPair > *msgs)
Definition: msg_buffer.h:154
int LookupLatest(ConstPtr *msg)
Definition: msg_buffer.h:139
MsgBuffer()
Definition: msg_buffer.h:42
DECLARE_int32(obs_msg_buffer_size)