diff --git a/cartographer/sensor/ordered_multi_queue.cc b/cartographer/sensor/ordered_multi_queue.cc index 3b4f70f..ad6ce33 100644 --- a/cartographer/sensor/ordered_multi_queue.cc +++ b/cartographer/sensor/ordered_multi_queue.cc @@ -88,9 +88,9 @@ void OrderedMultiQueue::Dispatch() { while (true) { Queue* next_queue = nullptr; const Data* next_data = nullptr; + int next_trajectory_id = -1; for (auto it = queues_.begin(); it != queues_.end();) { - auto& queue = it->second.queue; - const auto* data = queue.Peek(); + const auto* data = it->second.queue.Peek(); if (data == nullptr) { if (it->second.finished) { queues_.erase(it++); @@ -99,11 +99,10 @@ void OrderedMultiQueue::Dispatch() { CannotMakeProgress(); return; } - if (next_data == nullptr || - std::forward_as_tuple(data->time, it->first) < - std::forward_as_tuple(next_data->time, it->first)) { + if (next_data == nullptr || data->time < next_data->time) { next_data = data; next_queue = &it->second; + next_trajectory_id = it->first.trajectory_id; } CHECK_LE(last_dispatched_time_, next_data->time) << "Non-sorted data added to queue: '" << it->first << "'"; @@ -114,19 +113,13 @@ void OrderedMultiQueue::Dispatch() { return; } - // If we haven't dispatched any data yet, fast forward all queues until a - // common start time has been reached. - if (common_start_time_ == common::Time::min()) { - for (auto& entry : queues_) { - common_start_time_ = - std::max(common_start_time_, entry.second.queue.Peek()->time); - } - LOG(INFO) << "All sensor data is available starting at '" - << common_start_time_ << "'."; - } + // If we haven't dispatched any data for this trajectory yet, fast forward + // all queues of this trajectory until a common start time has been reached. + const common::Time common_start_time = + GetCommonStartTime(next_trajectory_id); - if (next_data->time >= common_start_time_) { - // Happy case, we are beyond the 'common_start_time_' already. + if (next_data->time >= common_start_time) { + // Happy case, we are beyond the 'common_start_time' already. last_dispatched_time_ = next_data->time; next_queue->callback(next_queue->queue.Pop()); } else if (next_queue->queue.Size() < 2) { @@ -138,10 +131,10 @@ void OrderedMultiQueue::Dispatch() { next_queue->callback(next_queue->queue.Pop()); } else { // We take a peek at the time after next data. If it also is not beyond - // 'common_start_time_' we drop 'next_data', otherwise we just found the + // 'common_start_time' we drop 'next_data', otherwise we just found the // first packet to dispatch from this queue. std::unique_ptr next_data_owner = next_queue->queue.Pop(); - if (next_queue->queue.Peek()->time > common_start_time_) { + if (next_queue->queue.Peek()->time > common_start_time) { last_dispatched_time_ = next_data->time; next_queue->callback(std::move(next_data_owner)); } @@ -169,5 +162,22 @@ string OrderedMultiQueue::EmptyQueuesDebugString() { return empty_queues.str(); } +common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) { + auto emplace_result = common_start_time_per_trajectory_.emplace( + trajectory_id, common::Time::min()); + common::Time& common_start_time = emplace_result.first->second; + if (emplace_result.second) { + for (auto& entry : queues_) { + if (entry.first.trajectory_id == trajectory_id) { + common_start_time = + std::max(common_start_time, entry.second.queue.Peek()->time); + } + } + LOG(INFO) << "All sensor data for trajectory " << trajectory_id + << " is available starting at '" << common_start_time << "'."; + } + return common_start_time; +} + } // namespace sensor } // namespace cartographer diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 55b7481..57f75d5 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -79,11 +79,12 @@ class OrderedMultiQueue { void Dispatch(); void CannotMakeProgress(); string EmptyQueuesDebugString(); + common::Time GetCommonStartTime(int trajectory_id); // Used to verify that values are dispatched in sorted order. common::Time last_dispatched_time_ = common::Time::min(); - common::Time common_start_time_ = common::Time::min(); + std::map common_start_time_per_trajectory_; std::map queues_; }; diff --git a/cartographer/sensor/ordered_multi_queue_test.cc b/cartographer/sensor/ordered_multi_queue_test.cc index 874d2a6..b906fb5 100644 --- a/cartographer/sensor/ordered_multi_queue_test.cc +++ b/cartographer/sensor/ordered_multi_queue_test.cc @@ -94,6 +94,23 @@ TEST_F(OrderedMultiQueueTest, MarkQueueAsFinished) { } } +TEST_F(OrderedMultiQueueTest, CommonStartTimePerTrajectory) { + queue_.Add(kFirst, MakeImu(0)); + queue_.Add(kFirst, MakeImu(1)); + queue_.Add(kFirst, MakeImu(2)); + queue_.Add(kFirst, MakeImu(3)); + queue_.Add(kSecond, MakeImu(2)); + EXPECT_TRUE(values_.empty()); + queue_.Add(kThird, MakeImu(4)); + EXPECT_EQ(values_.size(), 2); + queue_.MarkQueueAsFinished(kFirst); + EXPECT_EQ(values_.size(), 2); + queue_.MarkQueueAsFinished(kSecond); + EXPECT_EQ(values_.size(), 4); + queue_.MarkQueueAsFinished(kThird); + EXPECT_EQ(values_.size(), 4); +} + } // namespace } // namespace sensor } // namespace cartographer