Determine the trajectory that is blocking progress. (#154)

When processing offline data determining which trajectory needs more
data before processing can continue is surprisingly tricky. We thus
expose this information at the map builder to avoid duplicating this
logic.
master
Wolfgang Hess 2016-12-06 17:04:45 +01:00 committed by GitHub
parent ead4d03b16
commit db45c4ef78
6 changed files with 41 additions and 22 deletions

View File

@ -108,6 +108,10 @@ void MapBuilder::FinishTrajectory(const int trajectory_id) {
sensor_collator_.FinishTrajectory(trajectory_id);
}
int MapBuilder::GetBlockingTrajectoryId() const {
return sensor_collator_.GetBlockingTrajectoryId();
}
int MapBuilder::GetTrajectoryId(const Submaps* const trajectory) const {
const auto trajectory_id = trajectory_ids_.find(trajectory);
CHECK(trajectory_id != trajectory_ids_.end());

View File

@ -67,6 +67,11 @@ class MapBuilder {
// i.e. no further sensor data is expected.
void FinishTrajectory(int trajectory_id);
// Must only be called if at least one unfinished trajectory exists. Returns
// the ID of the trajectory that needs more data before the MapBuilder is
// unblocked.
int GetBlockingTrajectoryId() const;
// Returns the trajectory ID for 'trajectory'.
int GetTrajectoryId(const mapping::Submaps* trajectory) const;

View File

@ -46,5 +46,9 @@ void Collator::AddSensorData(const int trajectory_id, const string& sensor_id,
void Collator::Flush() { queue_.Flush(); }
int Collator::GetBlockingTrajectoryId() const {
return queue_.GetBlocker().trajectory_id;
}
} // namespace sensor
} // namespace cartographer

View File

@ -57,6 +57,11 @@ class Collator {
// AddSensorData may not be called after Flush.
void Flush();
// Must only be called if at least one unfinished trajectory exists. Returns
// the ID of the trajectory that needs more data before the Collator is
// unblocked.
int GetBlockingTrajectoryId() const;
private:
// Queue keys are a pair of trajectory ID and sensor identifier.
OrderedMultiQueue queue_;

View File

@ -32,12 +32,12 @@ namespace {
// for data.
const int kMaxQueueSize = 500;
} // namespace
inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
}
} // namespace
OrderedMultiQueue::OrderedMultiQueue() {}
OrderedMultiQueue::~OrderedMultiQueue() {
@ -84,11 +84,16 @@ void OrderedMultiQueue::Flush() {
}
}
QueueKey OrderedMultiQueue::GetBlocker() const {
CHECK(!queues_.empty());
return blocker_;
}
void OrderedMultiQueue::Dispatch() {
while (true) {
Queue* next_queue = nullptr;
const Data* next_data = nullptr;
int next_trajectory_id = -1;
Queue* next_queue = nullptr;
QueueKey next_queue_key;
for (auto it = queues_.begin(); it != queues_.end();) {
const auto* data = it->second.queue.Peek<Data>();
if (data == nullptr) {
@ -96,13 +101,13 @@ void OrderedMultiQueue::Dispatch() {
queues_.erase(it++);
continue;
}
CannotMakeProgress();
CannotMakeProgress(it->first);
return;
}
if (next_data == nullptr || data->time < next_data->time) {
next_data = data;
next_queue = &it->second;
next_trajectory_id = it->first.trajectory_id;
next_queue_key = it->first;
}
CHECK_LE(last_dispatched_time_, next_data->time)
<< "Non-sorted data added to queue: '" << it->first << "'";
@ -116,7 +121,7 @@ void OrderedMultiQueue::Dispatch() {
// If we haven't dispatched any data for this trajectory yet, fast forward
// all queues of this trajectory until a common start time has been reached.
const common::Time common_start_time =
GetCommonStartTime(next_trajectory_id);
GetCommonStartTime(next_queue_key.trajectory_id);
if (next_data->time >= common_start_time) {
// Happy case, we are beyond the 'common_start_time' already.
@ -125,6 +130,7 @@ void OrderedMultiQueue::Dispatch() {
} else if (next_queue->queue.Size() < 2) {
if (!next_queue->finished) {
// We cannot decide whether to drop or dispatch this yet.
CannotMakeProgress(next_queue_key);
return;
}
last_dispatched_time_ = next_data->time;
@ -142,26 +148,16 @@ void OrderedMultiQueue::Dispatch() {
}
}
void OrderedMultiQueue::CannotMakeProgress() {
void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
blocker_ = queue_key;
for (auto& entry : queues_) {
if (entry.second.queue.Size() > kMaxQueueSize) {
LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: "
<< EmptyQueuesDebugString();
LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
return;
}
}
}
string OrderedMultiQueue::EmptyQueuesDebugString() {
std::ostringstream empty_queues;
for (auto& entry : queues_) {
if (entry.second.queue.Size() == 0) {
empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first;
}
}
return empty_queues.str();
}
common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
auto emplace_result = common_start_time_per_trajectory_.emplace(
trajectory_id, common::Time::min());

View File

@ -69,6 +69,11 @@ class OrderedMultiQueue {
// queues.
void Flush();
// Must only be called if at least one unfinished queue exists. Returns the
// key of a queue that needs more data before the OrderedMultiQueue can
// dispatch data.
QueueKey GetBlocker() const;
private:
struct Queue {
common::BlockingQueue<std::unique_ptr<Data>> queue;
@ -77,8 +82,7 @@ class OrderedMultiQueue {
};
void Dispatch();
void CannotMakeProgress();
string EmptyQueuesDebugString();
void CannotMakeProgress(const QueueKey& queue_key);
common::Time GetCommonStartTime(int trajectory_id);
// Used to verify that values are dispatched in sorted order.
@ -86,6 +90,7 @@ class OrderedMultiQueue {
std::map<int, common::Time> common_start_time_per_trajectory_;
std::map<QueueKey, Queue> queues_;
QueueKey blocker_;
};
} // namespace sensor