Introduce separate mutex for PoseGraph2D work queue access. (#1333)

This is #1284 for 2D.
master
Wolfgang Hess 2018-07-26 11:15:57 +02:00 committed by Wally B. Feed
parent 9a2df068ed
commit e87100a3ad
3 changed files with 50 additions and 29 deletions

View File

@ -52,7 +52,7 @@ PoseGraph2D::PoseGraph2D(
PoseGraph2D::~PoseGraph2D() { PoseGraph2D::~PoseGraph2D() {
WaitForAllComputations(); WaitForAllComputations();
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&work_queue_mutex_);
CHECK(work_queue_ == nullptr); CHECK(work_queue_ == nullptr);
} }
@ -156,7 +156,7 @@ NodeId PoseGraph2D::AddNode(
void PoseGraph2D::AddWorkItem( void PoseGraph2D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item) { const std::function<WorkItem::Result()>& work_item) {
{ {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&work_queue_mutex_);
if (work_queue_ != nullptr) { if (work_queue_ != nullptr) {
const auto now = std::chrono::steady_clock::now(); const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item}); work_queue_->push_back({now, work_item});
@ -167,7 +167,7 @@ void PoseGraph2D::AddWorkItem(
} }
if (work_item() == WorkItem::Result::kRunOptimization) { if (work_item() == WorkItem::Result::kRunOptimization) {
{ {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&work_queue_mutex_);
work_queue_ = common::make_unique<WorkQueue>(); work_queue_ = common::make_unique<WorkQueue>();
} }
constraint_builder_.WhenDone( constraint_builder_.WhenDone(
@ -474,7 +474,7 @@ void PoseGraph2D::HandleWorkQueue(
while (process_work_queue) { while (process_work_queue) {
std::function<WorkItem::Result()> work_item; std::function<WorkItem::Result()> work_item;
{ {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&work_queue_mutex_);
if (work_queue_->empty()) { if (work_queue_->empty()) {
work_queue_.reset(); work_queue_.reset();
return; return;
@ -494,40 +494,60 @@ void PoseGraph2D::HandleWorkQueue(
} }
void PoseGraph2D::WaitForAllComputations() { void PoseGraph2D::WaitForAllComputations() {
bool notification = false; int num_trajectory_nodes;
{
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
num_trajectory_nodes = data_.num_trajectory_nodes;
}
const int num_finished_nodes_at_start = const int num_finished_nodes_at_start =
constraint_builder_.GetNumFinishedNodes(); constraint_builder_.GetNumFinishedNodes();
while (!locker.AwaitWithTimeout(
[this]() REQUIRES(mutex_) { auto report_progress = [this, num_trajectory_nodes,
return ((constraint_builder_.GetNumFinishedNodes() == num_finished_nodes_at_start]() {
data_.num_trajectory_nodes) &&
!work_queue_);
},
common::FromSeconds(1.))) {
// Log progress on nodes only when we are actually processing nodes. // Log progress on nodes only when we are actually processing nodes.
if (data_.num_trajectory_nodes != num_finished_nodes_at_start) { if (num_trajectory_nodes != num_finished_nodes_at_start) {
std::ostringstream progress_info; std::ostringstream progress_info;
progress_info << "Optimizing: " << std::fixed << std::setprecision(1) progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
<< 100. * << 100. *
(constraint_builder_.GetNumFinishedNodes() - (constraint_builder_.GetNumFinishedNodes() -
num_finished_nodes_at_start) / num_finished_nodes_at_start) /
(data_.num_trajectory_nodes - (num_trajectory_nodes - num_finished_nodes_at_start)
num_finished_nodes_at_start)
<< "%..."; << "%...";
std::cout << "\r\x1b[K" << progress_info.str() << std::flush; std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
} }
};
// First wait for the work queue to drain so that it's safe to schedule
// a WhenDone() callback.
{
common::MutexLocker locker(&work_queue_mutex_);
while (!locker.AwaitWithTimeout(
[this]() REQUIRES(work_queue_mutex_) { return work_queue_ == nullptr; },
common::FromSeconds(1.))) {
report_progress();
} }
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl; }
// Now wait for any pending constraint computations to finish.
common::MutexLocker locker(&mutex_);
bool notification = false;
constraint_builder_.WhenDone( constraint_builder_.WhenDone(
[this, [this,
&notification](const constraints::ConstraintBuilder2D::Result& result) { &notification](const constraints::ConstraintBuilder2D::Result& result)
EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(), data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end()); result.end());
notification = true; notification = true;
}); });
locker.Await([&notification]() { return notification; }); while (!locker.AwaitWithTimeout([&notification]()
REQUIRES(mutex_) { return notification; },
common::FromSeconds(1.))) {
report_progress();
}
CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes);
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
} }
void PoseGraph2D::DeleteTrajectory(const int trajectory_id) { void PoseGraph2D::DeleteTrajectory(const int trajectory_id) {

View File

@ -160,7 +160,7 @@ class PoseGraph2D : public PoseGraph {
// Handles a new work item. // Handles a new work item.
void AddWorkItem(const std::function<WorkItem::Result()>& work_item) void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
EXCLUDES(mutex_); EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
// Adds connectivity and sampler for a trajectory if it does not exist. // Adds connectivity and sampler for a trajectory if it does not exist.
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_); void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
@ -196,11 +196,11 @@ class PoseGraph2D : public PoseGraph {
// Runs the optimization, executes the trimmers and processes the work queue. // Runs the optimization, executes the trimmers and processes the work queue.
void HandleWorkQueue(const constraints::ConstraintBuilder2D::Result& result) void HandleWorkQueue(const constraints::ConstraintBuilder2D::Result& result)
EXCLUDES(mutex_); EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
// Waits until we caught up (i.e. nothing is waiting to be scheduled), and // Waits until we caught up (i.e. nothing is waiting to be scheduled), and
// all computations have finished. // all computations have finished.
void WaitForAllComputations() EXCLUDES(mutex_); void WaitForAllComputations() EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
// Runs the optimization. Callers have to make sure, that there is only one // Runs the optimization. Callers have to make sure, that there is only one
// optimization being run at a time. // optimization being run at a time.
@ -228,10 +228,11 @@ class PoseGraph2D : public PoseGraph {
const proto::PoseGraphOptions options_; const proto::PoseGraphOptions options_;
GlobalSlamOptimizationCallback global_slam_optimization_callback_; GlobalSlamOptimizationCallback global_slam_optimization_callback_;
mutable common::Mutex mutex_; mutable common::Mutex mutex_;
common::Mutex work_queue_mutex_;
// If it exists, further work items must be added to this queue, and will be // If it exists, further work items must be added to this queue, and will be
// considered later. // considered later.
std::unique_ptr<WorkQueue> work_queue_ GUARDED_BY(mutex_); std::unique_ptr<WorkQueue> work_queue_ GUARDED_BY(work_queue_mutex_);
// We globally localize a fraction of the nodes from each trajectory. // We globally localize a fraction of the nodes from each trajectory.
std::unordered_map<int, std::unique_ptr<common::FixedRatioSampler>> std::unordered_map<int, std::unique_ptr<common::FixedRatioSampler>>

View File

@ -155,7 +155,7 @@ class PoseGraph3D : public PoseGraph {
protected: protected:
// Waits until we caught up (i.e. nothing is waiting to be scheduled), and // Waits until we caught up (i.e. nothing is waiting to be scheduled), and
// all computations have finished. // all computations have finished.
void WaitForAllComputations() EXCLUDES(mutex_); void WaitForAllComputations() EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
private: private:
MapById<SubmapId, SubmapData> GetSubmapDataUnderLock() const REQUIRES(mutex_); MapById<SubmapId, SubmapData> GetSubmapDataUnderLock() const REQUIRES(mutex_);