28 namespace perception {
31 template <
class Dtype>
33 std::shared_ptr<Dtype>
data;
41 template <
class DataType>
46 void set(std::size_t cache_size, std::size_t prefetch_size,
47 std::size_t thread_num) {
48 std::lock_guard<std::mutex> lock(_mutex);
49 _fixed_cache_size = cache_size;
50 _prefetch_data_size = prefetch_size;
51 if (_thread_pool ==
nullptr) {
54 _thread_pool->stop(
true);
55 _thread_pool->start(static_cast<int>(thread_num));
59 bool query_next(std::shared_ptr<DataType>&
data)
override;
60 bool query_last(std::shared_ptr<DataType>& data)
override;
63 using CachePtr = std::shared_ptr<Cache<DataType>>;
69 std::size_t _fixed_cache_size = 50;
70 std::size_t _prefetch_data_size = 5;
73 std::map<int, CachePtr> _cached_data;
74 std::unique_ptr<ctpl::thread_pool> _thread_pool;
84 template <
class DataType>
86 std::shared_ptr<DataType>&
data) {
87 if (_thread_pool ==
nullptr) {
93 std::lock_guard<std::mutex> lock(_mutex);
95 if (_idx >= static_cast<int>(_filenames[0].size())) {
97 }
else if (_idx < 0) {
103 auto iter = _cached_data.find(_idx);
105 if (iter == _cached_data.end()) {
106 std::cerr <<
"Fail to prefetch, start loading..." << std::endl;
108 cache_ptr->data.reset(
new DataType);
109 std::vector<std::string> files;
110 for (
auto& names : _filenames) {
111 files.push_back(names[_idx]);
113 cache_ptr->loaded =
true;
114 cache_ptr->load_success = cache_ptr->data->load(files);
115 _cached_data.emplace(_idx, cache_ptr);
116 data = cache_ptr->data;
117 load_success = cache_ptr->load_success;
120 if (!iter->second->loaded) {
121 iter->second->status.wait();
122 iter->second->loaded =
true;
124 data = iter->second->data;
125 load_success = iter->second->load_success;
128 for (
int i = _idx + 1;
129 i <= std::min(static_cast<std::size_t>(_idx) + _prefetch_data_size,
130 _filenames[0].size() - 1);
132 auto prefetch_iter = _cached_data.find(i);
133 if (prefetch_iter == _cached_data.end()) {
135 cache_ptr->data.reset(
new DataType);
136 std::vector<std::string> files;
137 for (
auto& names : _filenames) {
138 files.push_back(names[i]);
140 cache_ptr->status = _thread_pool->push([cache_ptr, files](
int id) {
141 cache_ptr->load_success = cache_ptr->data->load(files);
143 _cached_data.emplace(i, cache_ptr);
147 for (
auto citer = _cached_data.begin();
148 citer != _cached_data.end() &&
149 _cached_data.size() > _fixed_cache_size;) {
150 if (citer->second->loaded && _idx != citer->first) {
151 citer->second->data->release();
152 _cached_data.erase(citer++);
160 template <
class DataType>
162 std::shared_ptr<DataType>&
data) {
163 if (_thread_pool ==
nullptr) {
169 std::lock_guard<std::mutex> lock(_mutex);
170 if (data ==
nullptr) {
171 data.reset(
new DataType);
176 }
else if (_idx >= static_cast<int>(_filenames[0].size())) {
177 _idx =
static_cast<int>(_filenames[0].size() - 1);
181 auto iter = _cached_data.find(_idx);
183 if (iter == _cached_data.end()) {
184 std::cerr <<
"Fail to prefetch, start loading..." << std::endl;
186 cache_ptr->data.reset(
new DataType);
187 cache_ptr->loaded =
true;
188 std::vector<std::string> files;
189 for (
auto& names : _filenames) {
190 files.push_back(names[_idx]);
192 cache_ptr->load_success = cache_ptr->data->load(files);
193 _cached_data.emplace(_idx, cache_ptr);
194 data = cache_ptr->data;
195 load_success = cache_ptr->load_success;
198 if (!iter->second->loaded) {
199 iter->second->status.wait();
200 iter->second->loaded =
true;
202 data = iter->second->data;
203 load_success = iter->second->load_success;
206 for (
int i = _idx - 1;
207 i >= std::max(_idx - static_cast<int>(_prefetch_data_size), 0); --i) {
208 auto prefetch_iter = _cached_data.find(i);
209 if (prefetch_iter == _cached_data.end()) {
211 cache_ptr->data.reset(
new DataType);
212 std::vector<std::string> files;
213 for (
auto& names : _filenames) {
214 files.push_back(names[i]);
216 cache_ptr->status = _thread_pool->push([cache_ptr, files](
int id) {
217 cache_ptr->load_success = cache_ptr->data->load(files);
219 _cached_data.emplace(i, cache_ptr);
223 for (
auto citer = _cached_data.rbegin();
224 citer != _cached_data.rend() &&
225 _cached_data.size() > _fixed_cache_size;) {
226 if (citer->second->loaded && citer->first != _idx) {
227 citer->second->data->release();
228 _cached_data.erase((++citer).base());
bool load_success
Definition: async_sequence_data_loader.h:35
bool loaded
Definition: async_sequence_data_loader.h:34
bool query_last(std::shared_ptr< DataType > &data) override
Definition: async_sequence_data_loader.h:161
std::future< void > status
Definition: async_sequence_data_loader.h:36
std::shared_ptr< Cache< apollo::perception::benchmark::FrameStatistics > > CachePtr
Definition: async_sequence_data_loader.h:63
bool query_next(std::shared_ptr< DataType > &data) override
Definition: async_sequence_data_loader.h:85
std::shared_ptr< Dtype > data
Definition: async_sequence_data_loader.h:33
Definition: sequence_data_loader.h:37
Definition: async_sequence_data_loader.h:42
Definition: async_sequence_data_loader.h:32