24 #include "boost/circular_buffer.hpp" 25 #include "cyber/cyber.h" 26 #include "gflags/gflags.h" 29 namespace perception {
42 MsgBuffer() : buffer_queue_(FLAGS_obs_msg_buffer_size) {}
48 void Init(
const std::string& channel,
const std::string& name);
56 std::vector<ObjectPair>* msgs);
59 void MsgCallback(
const ConstPtr& msg);
62 std::string node_name_;
63 std::unique_ptr<cyber::Node> node_;
64 std::shared_ptr<cyber::Reader<T>> msg_subscriber_;
65 std::mutex buffer_mutex_;
68 boost::circular_buffer<ObjectPair> buffer_queue_;
73 int index =
static_cast<int>(name.find_last_of(
'/'));
75 node_name_ = name.substr(index + 1) +
"_subscriber";
77 node_name_ = name +
"_subscriber";
79 node_.reset(apollo::cyber::CreateNode(node_name_).release());
81 std::function<void(const ConstPtr&)> register_call =
83 msg_subscriber_ = node_->CreateReader<T>(channel, register_call);
85 std::lock_guard<std::mutex> lock(buffer_mutex_);
86 buffer_queue_.set_capacity(FLAGS_obs_msg_buffer_size);
92 std::lock_guard<std::mutex> lock(buffer_mutex_);
93 double timestamp = msg->measurement_time();
94 buffer_queue_.push_back(std::make_pair(timestamp, msg));
99 std::lock_guard<std::mutex> lock(buffer_mutex_);
101 AERROR <<
"msg buffer is uninitialized.";
104 if (buffer_queue_.empty()) {
105 AERROR <<
"msg buffer is empty.";
108 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
110 AERROR <<
"Your timestamp (" << timestamp
111 <<
") is earlier than the oldest timestamp (" 112 << buffer_queue_.front().first <<
").";
115 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
117 AERROR <<
"Your timestamp (" << timestamp
118 <<
") is newer than the latest timestamp (" 119 << buffer_queue_.back().first <<
").";
124 double distance = DBL_MAX;
125 int idx =
static_cast<int>(buffer_queue_.size()) - 1;
126 for (; idx >= 0; --idx) {
127 double temp_distance = fabs(timestamp - buffer_queue_[idx].first);
128 if (temp_distance >= distance) {
131 distance = temp_distance;
133 *msg = buffer_queue_[idx + 1].second;
140 std::lock_guard<std::mutex> lock(buffer_mutex_);
142 AERROR <<
"Message buffer is uninitialized.";
145 if (buffer_queue_.empty()) {
146 AERROR <<
"Message buffer is empty.";
149 *msg = buffer_queue_.back().second;
155 std::vector<ObjectPair>* msgs) {
156 std::lock_guard<std::mutex> lock(buffer_mutex_);
158 AERROR <<
"Message buffer is uninitialized.";
161 if (buffer_queue_.empty()) {
162 AERROR <<
"Message buffer is empty.";
165 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
167 AERROR <<
"Your timestamp (" << timestamp <<
") is earlier than the oldest " 168 <<
"timestamp (" << buffer_queue_.front().first <<
").";
171 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
173 AERROR <<
"Your timestamp (" << timestamp <<
") is newer than the latest " 174 <<
"timestamp (" << buffer_queue_.back().first <<
").";
178 const double lower_timestamp = timestamp - period;
179 const double upper_timestamp = timestamp + period;
180 for (
const auto& obj_pair : buffer_queue_) {
181 if (obj_pair.first < lower_timestamp) {
184 if (obj_pair.first > upper_timestamp) {
187 msgs->push_back(obj_pair);
Definition: msg_buffer.h:36
int LookupNearest(double timestamp, ConstPtr *msg)
Definition: msg_buffer.h:98
DECLARE_double(obs_buffer_match_precision)
std::shared_ptr< T const > ConstPtr
Definition: msg_buffer.h:38
void Init(const std::string &channel, const std::string &name)
Definition: msg_buffer.h:72
MsgBuffer operator=(const MsgBuffer &)=delete
std::pair< double, ConstPtr > ObjectPair
Definition: msg_buffer.h:39
int LookupPeriod(double timestamp, double period, std::vector< ObjectPair > *msgs)
Definition: msg_buffer.h:154
int LookupLatest(ConstPtr *msg)
Definition: msg_buffer.h:139
MsgBuffer()
Definition: msg_buffer.h:42
DECLARE_int32(obs_msg_buffer_size)