From e87100a3ad82805be24ba8e1ee7a2c5aa7213c59 Mon Sep 17 00:00:00 2001 From: Wolfgang Hess Date: Thu, 26 Jul 2018 11:15:57 +0200 Subject: [PATCH] Introduce separate mutex for PoseGraph2D work queue access. (#1333) This is #1284 for 2D. --- .../mapping/internal/2d/pose_graph_2d.cc | 68 ++++++++++++------- .../mapping/internal/2d/pose_graph_2d.h | 9 +-- .../mapping/internal/3d/pose_graph_3d.h | 2 +- 3 files changed, 50 insertions(+), 29 deletions(-) diff --git a/cartographer/mapping/internal/2d/pose_graph_2d.cc b/cartographer/mapping/internal/2d/pose_graph_2d.cc index 0c6055e..dbcad28 100644 --- a/cartographer/mapping/internal/2d/pose_graph_2d.cc +++ b/cartographer/mapping/internal/2d/pose_graph_2d.cc @@ -52,7 +52,7 @@ PoseGraph2D::PoseGraph2D( PoseGraph2D::~PoseGraph2D() { WaitForAllComputations(); - common::MutexLocker locker(&mutex_); + common::MutexLocker locker(&work_queue_mutex_); CHECK(work_queue_ == nullptr); } @@ -156,7 +156,7 @@ NodeId PoseGraph2D::AddNode( void PoseGraph2D::AddWorkItem( const std::function& work_item) { { - common::MutexLocker locker(&mutex_); + common::MutexLocker locker(&work_queue_mutex_); if (work_queue_ != nullptr) { const auto now = std::chrono::steady_clock::now(); work_queue_->push_back({now, work_item}); @@ -167,7 +167,7 @@ void PoseGraph2D::AddWorkItem( } if (work_item() == WorkItem::Result::kRunOptimization) { { - common::MutexLocker locker(&mutex_); + common::MutexLocker locker(&work_queue_mutex_); work_queue_ = common::make_unique(); } constraint_builder_.WhenDone( @@ -474,7 +474,7 @@ void PoseGraph2D::HandleWorkQueue( while (process_work_queue) { std::function work_item; { - common::MutexLocker locker(&mutex_); + common::MutexLocker locker(&work_queue_mutex_); if (work_queue_->empty()) { work_queue_.reset(); return; @@ -494,40 +494,60 @@ void PoseGraph2D::HandleWorkQueue( } void PoseGraph2D::WaitForAllComputations() { - bool notification = false; - common::MutexLocker locker(&mutex_); + int num_trajectory_nodes; + { + common::MutexLocker locker(&mutex_); + num_trajectory_nodes = data_.num_trajectory_nodes; + } + const int num_finished_nodes_at_start = constraint_builder_.GetNumFinishedNodes(); - while (!locker.AwaitWithTimeout( - [this]() REQUIRES(mutex_) { - return ((constraint_builder_.GetNumFinishedNodes() == - data_.num_trajectory_nodes) && - !work_queue_); - }, - common::FromSeconds(1.))) { + + auto report_progress = [this, num_trajectory_nodes, + num_finished_nodes_at_start]() { // Log progress on nodes only when we are actually processing nodes. - if (data_.num_trajectory_nodes != num_finished_nodes_at_start) { + if (num_trajectory_nodes != num_finished_nodes_at_start) { std::ostringstream progress_info; progress_info << "Optimizing: " << std::fixed << std::setprecision(1) << 100. * (constraint_builder_.GetNumFinishedNodes() - num_finished_nodes_at_start) / - (data_.num_trajectory_nodes - - num_finished_nodes_at_start) + (num_trajectory_nodes - num_finished_nodes_at_start) << "%..."; std::cout << "\r\x1b[K" << progress_info.str() << std::flush; } + }; + + // First wait for the work queue to drain so that it's safe to schedule + // a WhenDone() callback. + { + common::MutexLocker locker(&work_queue_mutex_); + while (!locker.AwaitWithTimeout( + [this]() REQUIRES(work_queue_mutex_) { return work_queue_ == nullptr; }, + common::FromSeconds(1.))) { + report_progress(); + } } - std::cout << "\r\x1b[KOptimizing: Done. " << std::endl; + + // Now wait for any pending constraint computations to finish. + common::MutexLocker locker(&mutex_); + bool notification = false; constraint_builder_.WhenDone( [this, - ¬ification](const constraints::ConstraintBuilder2D::Result& result) { - common::MutexLocker locker(&mutex_); - data_.constraints.insert(data_.constraints.end(), result.begin(), - result.end()); - notification = true; - }); - locker.Await([¬ification]() { return notification; }); + ¬ification](const constraints::ConstraintBuilder2D::Result& result) + EXCLUDES(mutex_) { + common::MutexLocker locker(&mutex_); + data_.constraints.insert(data_.constraints.end(), result.begin(), + result.end()); + notification = true; + }); + while (!locker.AwaitWithTimeout([¬ification]() + REQUIRES(mutex_) { return notification; }, + common::FromSeconds(1.))) { + report_progress(); + } + CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes); + std::cout << "\r\x1b[KOptimizing: Done. " << std::endl; } void PoseGraph2D::DeleteTrajectory(const int trajectory_id) { diff --git a/cartographer/mapping/internal/2d/pose_graph_2d.h b/cartographer/mapping/internal/2d/pose_graph_2d.h index 3b71847..a71a928 100644 --- a/cartographer/mapping/internal/2d/pose_graph_2d.h +++ b/cartographer/mapping/internal/2d/pose_graph_2d.h @@ -160,7 +160,7 @@ class PoseGraph2D : public PoseGraph { // Handles a new work item. void AddWorkItem(const std::function& work_item) - EXCLUDES(mutex_); + EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_); // Adds connectivity and sampler for a trajectory if it does not exist. void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_); @@ -196,11 +196,11 @@ class PoseGraph2D : public PoseGraph { // Runs the optimization, executes the trimmers and processes the work queue. void HandleWorkQueue(const constraints::ConstraintBuilder2D::Result& result) - EXCLUDES(mutex_); + EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_); // Waits until we caught up (i.e. nothing is waiting to be scheduled), and // all computations have finished. - void WaitForAllComputations() EXCLUDES(mutex_); + void WaitForAllComputations() EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_); // Runs the optimization. Callers have to make sure, that there is only one // optimization being run at a time. @@ -228,10 +228,11 @@ class PoseGraph2D : public PoseGraph { const proto::PoseGraphOptions options_; GlobalSlamOptimizationCallback global_slam_optimization_callback_; mutable common::Mutex mutex_; + common::Mutex work_queue_mutex_; // If it exists, further work items must be added to this queue, and will be // considered later. - std::unique_ptr work_queue_ GUARDED_BY(mutex_); + std::unique_ptr work_queue_ GUARDED_BY(work_queue_mutex_); // We globally localize a fraction of the nodes from each trajectory. std::unordered_map> diff --git a/cartographer/mapping/internal/3d/pose_graph_3d.h b/cartographer/mapping/internal/3d/pose_graph_3d.h index 0608e45..7814a86 100644 --- a/cartographer/mapping/internal/3d/pose_graph_3d.h +++ b/cartographer/mapping/internal/3d/pose_graph_3d.h @@ -155,7 +155,7 @@ class PoseGraph3D : public PoseGraph { protected: // Waits until we caught up (i.e. nothing is waiting to be scheduled), and // all computations have finished. - void WaitForAllComputations() EXCLUDES(mutex_); + void WaitForAllComputations() EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_); private: MapById GetSubmapDataUnderLock() const REQUIRES(mutex_);