Compute the common start time per trajectory. (#153)
In the multi-trajectory case, other trajectories should not influence which data gets dropped. This is especially the case if all trajectories are added before any of their data. In this case, data before the start of the last trajectory was dropped.master
parent
f3526bd252
commit
ead4d03b16
|
@ -88,9 +88,9 @@ void OrderedMultiQueue::Dispatch() {
|
||||||
while (true) {
|
while (true) {
|
||||||
Queue* next_queue = nullptr;
|
Queue* next_queue = nullptr;
|
||||||
const Data* next_data = nullptr;
|
const Data* next_data = nullptr;
|
||||||
|
int next_trajectory_id = -1;
|
||||||
for (auto it = queues_.begin(); it != queues_.end();) {
|
for (auto it = queues_.begin(); it != queues_.end();) {
|
||||||
auto& queue = it->second.queue;
|
const auto* data = it->second.queue.Peek<Data>();
|
||||||
const auto* data = queue.Peek<Data>();
|
|
||||||
if (data == nullptr) {
|
if (data == nullptr) {
|
||||||
if (it->second.finished) {
|
if (it->second.finished) {
|
||||||
queues_.erase(it++);
|
queues_.erase(it++);
|
||||||
|
@ -99,11 +99,10 @@ void OrderedMultiQueue::Dispatch() {
|
||||||
CannotMakeProgress();
|
CannotMakeProgress();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (next_data == nullptr ||
|
if (next_data == nullptr || data->time < next_data->time) {
|
||||||
std::forward_as_tuple(data->time, it->first) <
|
|
||||||
std::forward_as_tuple(next_data->time, it->first)) {
|
|
||||||
next_data = data;
|
next_data = data;
|
||||||
next_queue = &it->second;
|
next_queue = &it->second;
|
||||||
|
next_trajectory_id = it->first.trajectory_id;
|
||||||
}
|
}
|
||||||
CHECK_LE(last_dispatched_time_, next_data->time)
|
CHECK_LE(last_dispatched_time_, next_data->time)
|
||||||
<< "Non-sorted data added to queue: '" << it->first << "'";
|
<< "Non-sorted data added to queue: '" << it->first << "'";
|
||||||
|
@ -114,19 +113,13 @@ void OrderedMultiQueue::Dispatch() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we haven't dispatched any data yet, fast forward all queues until a
|
// If we haven't dispatched any data for this trajectory yet, fast forward
|
||||||
// common start time has been reached.
|
// all queues of this trajectory until a common start time has been reached.
|
||||||
if (common_start_time_ == common::Time::min()) {
|
const common::Time common_start_time =
|
||||||
for (auto& entry : queues_) {
|
GetCommonStartTime(next_trajectory_id);
|
||||||
common_start_time_ =
|
|
||||||
std::max(common_start_time_, entry.second.queue.Peek<Data>()->time);
|
|
||||||
}
|
|
||||||
LOG(INFO) << "All sensor data is available starting at '"
|
|
||||||
<< common_start_time_ << "'.";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (next_data->time >= common_start_time_) {
|
if (next_data->time >= common_start_time) {
|
||||||
// Happy case, we are beyond the 'common_start_time_' already.
|
// Happy case, we are beyond the 'common_start_time' already.
|
||||||
last_dispatched_time_ = next_data->time;
|
last_dispatched_time_ = next_data->time;
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
} else if (next_queue->queue.Size() < 2) {
|
} else if (next_queue->queue.Size() < 2) {
|
||||||
|
@ -138,10 +131,10 @@ void OrderedMultiQueue::Dispatch() {
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
} else {
|
} else {
|
||||||
// We take a peek at the time after next data. If it also is not beyond
|
// We take a peek at the time after next data. If it also is not beyond
|
||||||
// 'common_start_time_' we drop 'next_data', otherwise we just found the
|
// 'common_start_time' we drop 'next_data', otherwise we just found the
|
||||||
// first packet to dispatch from this queue.
|
// first packet to dispatch from this queue.
|
||||||
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
|
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
|
||||||
if (next_queue->queue.Peek<Data>()->time > common_start_time_) {
|
if (next_queue->queue.Peek<Data>()->time > common_start_time) {
|
||||||
last_dispatched_time_ = next_data->time;
|
last_dispatched_time_ = next_data->time;
|
||||||
next_queue->callback(std::move(next_data_owner));
|
next_queue->callback(std::move(next_data_owner));
|
||||||
}
|
}
|
||||||
|
@ -169,5 +162,22 @@ string OrderedMultiQueue::EmptyQueuesDebugString() {
|
||||||
return empty_queues.str();
|
return empty_queues.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
|
||||||
|
auto emplace_result = common_start_time_per_trajectory_.emplace(
|
||||||
|
trajectory_id, common::Time::min());
|
||||||
|
common::Time& common_start_time = emplace_result.first->second;
|
||||||
|
if (emplace_result.second) {
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
if (entry.first.trajectory_id == trajectory_id) {
|
||||||
|
common_start_time =
|
||||||
|
std::max(common_start_time, entry.second.queue.Peek<Data>()->time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG(INFO) << "All sensor data for trajectory " << trajectory_id
|
||||||
|
<< " is available starting at '" << common_start_time << "'.";
|
||||||
|
}
|
||||||
|
return common_start_time;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace sensor
|
} // namespace sensor
|
||||||
} // namespace cartographer
|
} // namespace cartographer
|
||||||
|
|
|
@ -79,11 +79,12 @@ class OrderedMultiQueue {
|
||||||
void Dispatch();
|
void Dispatch();
|
||||||
void CannotMakeProgress();
|
void CannotMakeProgress();
|
||||||
string EmptyQueuesDebugString();
|
string EmptyQueuesDebugString();
|
||||||
|
common::Time GetCommonStartTime(int trajectory_id);
|
||||||
|
|
||||||
// Used to verify that values are dispatched in sorted order.
|
// Used to verify that values are dispatched in sorted order.
|
||||||
common::Time last_dispatched_time_ = common::Time::min();
|
common::Time last_dispatched_time_ = common::Time::min();
|
||||||
common::Time common_start_time_ = common::Time::min();
|
|
||||||
|
|
||||||
|
std::map<int, common::Time> common_start_time_per_trajectory_;
|
||||||
std::map<QueueKey, Queue> queues_;
|
std::map<QueueKey, Queue> queues_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -94,6 +94,23 @@ TEST_F(OrderedMultiQueueTest, MarkQueueAsFinished) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(OrderedMultiQueueTest, CommonStartTimePerTrajectory) {
|
||||||
|
queue_.Add(kFirst, MakeImu(0));
|
||||||
|
queue_.Add(kFirst, MakeImu(1));
|
||||||
|
queue_.Add(kFirst, MakeImu(2));
|
||||||
|
queue_.Add(kFirst, MakeImu(3));
|
||||||
|
queue_.Add(kSecond, MakeImu(2));
|
||||||
|
EXPECT_TRUE(values_.empty());
|
||||||
|
queue_.Add(kThird, MakeImu(4));
|
||||||
|
EXPECT_EQ(values_.size(), 2);
|
||||||
|
queue_.MarkQueueAsFinished(kFirst);
|
||||||
|
EXPECT_EQ(values_.size(), 2);
|
||||||
|
queue_.MarkQueueAsFinished(kSecond);
|
||||||
|
EXPECT_EQ(values_.size(), 4);
|
||||||
|
queue_.MarkQueueAsFinished(kThird);
|
||||||
|
EXPECT_EQ(values_.size(), 4);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
} // namespace sensor
|
} // namespace sensor
|
||||||
} // namespace cartographer
|
} // namespace cartographer
|
||||||
|
|
Loading…
Reference in New Issue