From db45c4ef7893b78f4c66d8d7e03a4079f21739ba Mon Sep 17 00:00:00 2001 From: Wolfgang Hess Date: Tue, 6 Dec 2016 17:04:45 +0100 Subject: [PATCH] Determine the trajectory that is blocking progress. (#154) When processing offline data determining which trajectory needs more data before processing can continue is surprisingly tricky. We thus expose this information at the map builder to avoid duplicating this logic. --- cartographer/mapping/map_builder.cc | 4 +++ cartographer/mapping/map_builder.h | 5 +++ cartographer/sensor/collator.cc | 4 +++ cartographer/sensor/collator.h | 5 +++ cartographer/sensor/ordered_multi_queue.cc | 36 ++++++++++------------ cartographer/sensor/ordered_multi_queue.h | 9 ++++-- 6 files changed, 41 insertions(+), 22 deletions(-) diff --git a/cartographer/mapping/map_builder.cc b/cartographer/mapping/map_builder.cc index 91c3e36..a42ad8e 100644 --- a/cartographer/mapping/map_builder.cc +++ b/cartographer/mapping/map_builder.cc @@ -108,6 +108,10 @@ void MapBuilder::FinishTrajectory(const int trajectory_id) { sensor_collator_.FinishTrajectory(trajectory_id); } +int MapBuilder::GetBlockingTrajectoryId() const { + return sensor_collator_.GetBlockingTrajectoryId(); +} + int MapBuilder::GetTrajectoryId(const Submaps* const trajectory) const { const auto trajectory_id = trajectory_ids_.find(trajectory); CHECK(trajectory_id != trajectory_ids_.end()); diff --git a/cartographer/mapping/map_builder.h b/cartographer/mapping/map_builder.h index 2f87ee3..f2c7e1d 100644 --- a/cartographer/mapping/map_builder.h +++ b/cartographer/mapping/map_builder.h @@ -67,6 +67,11 @@ class MapBuilder { // i.e. no further sensor data is expected. void FinishTrajectory(int trajectory_id); + // Must only be called if at least one unfinished trajectory exists. Returns + // the ID of the trajectory that needs more data before the MapBuilder is + // unblocked. + int GetBlockingTrajectoryId() const; + // Returns the trajectory ID for 'trajectory'. int GetTrajectoryId(const mapping::Submaps* trajectory) const; diff --git a/cartographer/sensor/collator.cc b/cartographer/sensor/collator.cc index 14d440d..0d62a65 100644 --- a/cartographer/sensor/collator.cc +++ b/cartographer/sensor/collator.cc @@ -46,5 +46,9 @@ void Collator::AddSensorData(const int trajectory_id, const string& sensor_id, void Collator::Flush() { queue_.Flush(); } +int Collator::GetBlockingTrajectoryId() const { + return queue_.GetBlocker().trajectory_id; +} + } // namespace sensor } // namespace cartographer diff --git a/cartographer/sensor/collator.h b/cartographer/sensor/collator.h index 627aaef..b159a15 100644 --- a/cartographer/sensor/collator.h +++ b/cartographer/sensor/collator.h @@ -57,6 +57,11 @@ class Collator { // AddSensorData may not be called after Flush. void Flush(); + // Must only be called if at least one unfinished trajectory exists. Returns + // the ID of the trajectory that needs more data before the Collator is + // unblocked. + int GetBlockingTrajectoryId() const; + private: // Queue keys are a pair of trajectory ID and sensor identifier. OrderedMultiQueue queue_; diff --git a/cartographer/sensor/ordered_multi_queue.cc b/cartographer/sensor/ordered_multi_queue.cc index ad6ce33..fc0a28b 100644 --- a/cartographer/sensor/ordered_multi_queue.cc +++ b/cartographer/sensor/ordered_multi_queue.cc @@ -32,12 +32,12 @@ namespace { // for data. const int kMaxQueueSize = 500; +} // namespace + inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) { return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')'; } -} // namespace - OrderedMultiQueue::OrderedMultiQueue() {} OrderedMultiQueue::~OrderedMultiQueue() { @@ -84,11 +84,16 @@ void OrderedMultiQueue::Flush() { } } +QueueKey OrderedMultiQueue::GetBlocker() const { + CHECK(!queues_.empty()); + return blocker_; +} + void OrderedMultiQueue::Dispatch() { while (true) { - Queue* next_queue = nullptr; const Data* next_data = nullptr; - int next_trajectory_id = -1; + Queue* next_queue = nullptr; + QueueKey next_queue_key; for (auto it = queues_.begin(); it != queues_.end();) { const auto* data = it->second.queue.Peek(); if (data == nullptr) { @@ -96,13 +101,13 @@ void OrderedMultiQueue::Dispatch() { queues_.erase(it++); continue; } - CannotMakeProgress(); + CannotMakeProgress(it->first); return; } if (next_data == nullptr || data->time < next_data->time) { next_data = data; next_queue = &it->second; - next_trajectory_id = it->first.trajectory_id; + next_queue_key = it->first; } CHECK_LE(last_dispatched_time_, next_data->time) << "Non-sorted data added to queue: '" << it->first << "'"; @@ -116,7 +121,7 @@ void OrderedMultiQueue::Dispatch() { // If we haven't dispatched any data for this trajectory yet, fast forward // all queues of this trajectory until a common start time has been reached. const common::Time common_start_time = - GetCommonStartTime(next_trajectory_id); + GetCommonStartTime(next_queue_key.trajectory_id); if (next_data->time >= common_start_time) { // Happy case, we are beyond the 'common_start_time' already. @@ -125,6 +130,7 @@ void OrderedMultiQueue::Dispatch() { } else if (next_queue->queue.Size() < 2) { if (!next_queue->finished) { // We cannot decide whether to drop or dispatch this yet. + CannotMakeProgress(next_queue_key); return; } last_dispatched_time_ = next_data->time; @@ -142,26 +148,16 @@ void OrderedMultiQueue::Dispatch() { } } -void OrderedMultiQueue::CannotMakeProgress() { +void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) { + blocker_ = queue_key; for (auto& entry : queues_) { if (entry.second.queue.Size() > kMaxQueueSize) { - LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: " - << EmptyQueuesDebugString(); + LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key; return; } } } -string OrderedMultiQueue::EmptyQueuesDebugString() { - std::ostringstream empty_queues; - for (auto& entry : queues_) { - if (entry.second.queue.Size() == 0) { - empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first; - } - } - return empty_queues.str(); -} - common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) { auto emplace_result = common_start_time_per_trajectory_.emplace( trajectory_id, common::Time::min()); diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 57f75d5..2cf83d2 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -69,6 +69,11 @@ class OrderedMultiQueue { // queues. void Flush(); + // Must only be called if at least one unfinished queue exists. Returns the + // key of a queue that needs more data before the OrderedMultiQueue can + // dispatch data. + QueueKey GetBlocker() const; + private: struct Queue { common::BlockingQueue> queue; @@ -77,8 +82,7 @@ class OrderedMultiQueue { }; void Dispatch(); - void CannotMakeProgress(); - string EmptyQueuesDebugString(); + void CannotMakeProgress(const QueueKey& queue_key); common::Time GetCommonStartTime(int trajectory_id); // Used to verify that values are dispatched in sorted order. @@ -86,6 +90,7 @@ class OrderedMultiQueue { std::map common_start_time_per_trajectory_; std::map queues_; + QueueKey blocker_; }; } // namespace sensor