Introduce separate mutex for PoseGraph3D work queue access (#1284)
Introduce separate mutex for PoseGraph3D work queue access. This reduces lock contention, see issue #1250. In particular higher frequency call to AddWorkItem don't need to take the main mutex from the froground thread anymore.master
parent
31d0a6acc7
commit
482f912d95
|
@ -51,7 +51,7 @@ PoseGraph3D::PoseGraph3D(
|
||||||
|
|
||||||
PoseGraph3D::~PoseGraph3D() {
|
PoseGraph3D::~PoseGraph3D() {
|
||||||
WaitForAllComputations();
|
WaitForAllComputations();
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&work_queue_mutex_);
|
||||||
CHECK(work_queue_ == nullptr);
|
CHECK(work_queue_ == nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,7 +153,7 @@ NodeId PoseGraph3D::AddNode(
|
||||||
void PoseGraph3D::AddWorkItem(
|
void PoseGraph3D::AddWorkItem(
|
||||||
const std::function<WorkItem::Result()>& work_item) {
|
const std::function<WorkItem::Result()>& work_item) {
|
||||||
{
|
{
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&work_queue_mutex_);
|
||||||
if (work_queue_ != nullptr) {
|
if (work_queue_ != nullptr) {
|
||||||
const auto now = std::chrono::steady_clock::now();
|
const auto now = std::chrono::steady_clock::now();
|
||||||
work_queue_->push_back({now, work_item});
|
work_queue_->push_back({now, work_item});
|
||||||
|
@ -164,7 +164,7 @@ void PoseGraph3D::AddWorkItem(
|
||||||
}
|
}
|
||||||
if (work_item() == WorkItem::Result::kRunOptimization) {
|
if (work_item() == WorkItem::Result::kRunOptimization) {
|
||||||
{
|
{
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&work_queue_mutex_);
|
||||||
work_queue_ = common::make_unique<WorkQueue>();
|
work_queue_ = common::make_unique<WorkQueue>();
|
||||||
}
|
}
|
||||||
constraint_builder_.WhenDone(
|
constraint_builder_.WhenDone(
|
||||||
|
@ -489,7 +489,7 @@ void PoseGraph3D::HandleWorkQueue(
|
||||||
while (process_work_queue) {
|
while (process_work_queue) {
|
||||||
std::function<WorkItem::Result()> work_item;
|
std::function<WorkItem::Result()> work_item;
|
||||||
{
|
{
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&work_queue_mutex_);
|
||||||
if (work_queue_->empty()) {
|
if (work_queue_->empty()) {
|
||||||
work_queue_.reset();
|
work_queue_.reset();
|
||||||
return;
|
return;
|
||||||
|
@ -509,40 +509,60 @@ void PoseGraph3D::HandleWorkQueue(
|
||||||
}
|
}
|
||||||
|
|
||||||
void PoseGraph3D::WaitForAllComputations() {
|
void PoseGraph3D::WaitForAllComputations() {
|
||||||
bool notification = false;
|
int num_trajectory_nodes;
|
||||||
|
{
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&mutex_);
|
||||||
|
num_trajectory_nodes = data_.num_trajectory_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
const int num_finished_nodes_at_start =
|
const int num_finished_nodes_at_start =
|
||||||
constraint_builder_.GetNumFinishedNodes();
|
constraint_builder_.GetNumFinishedNodes();
|
||||||
while (!locker.AwaitWithTimeout(
|
|
||||||
[this]() REQUIRES(mutex_) {
|
auto report_progress = [this, num_trajectory_nodes,
|
||||||
return ((constraint_builder_.GetNumFinishedNodes() ==
|
num_finished_nodes_at_start]() {
|
||||||
data_.num_trajectory_nodes) &&
|
|
||||||
!work_queue_);
|
|
||||||
},
|
|
||||||
common::FromSeconds(1.))) {
|
|
||||||
// Log progress on nodes only when we are actually processing nodes.
|
// Log progress on nodes only when we are actually processing nodes.
|
||||||
if (data_.num_trajectory_nodes != num_finished_nodes_at_start) {
|
if (num_trajectory_nodes != num_finished_nodes_at_start) {
|
||||||
std::ostringstream progress_info;
|
std::ostringstream progress_info;
|
||||||
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
|
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
|
||||||
<< 100. *
|
<< 100. *
|
||||||
(constraint_builder_.GetNumFinishedNodes() -
|
(constraint_builder_.GetNumFinishedNodes() -
|
||||||
num_finished_nodes_at_start) /
|
num_finished_nodes_at_start) /
|
||||||
(data_.num_trajectory_nodes -
|
(num_trajectory_nodes - num_finished_nodes_at_start)
|
||||||
num_finished_nodes_at_start)
|
|
||||||
<< "%...";
|
<< "%...";
|
||||||
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
|
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// First wait for the work queue to drain so that it's safe to schedule
|
||||||
|
// a WhenDone() callback.
|
||||||
|
{
|
||||||
|
common::MutexLocker locker(&work_queue_mutex_);
|
||||||
|
while (!locker.AwaitWithTimeout(
|
||||||
|
[this]() REQUIRES(work_queue_mutex_) { return work_queue_ == nullptr; },
|
||||||
|
common::FromSeconds(1.))) {
|
||||||
|
report_progress();
|
||||||
}
|
}
|
||||||
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
|
}
|
||||||
|
|
||||||
|
// Now wait for any pending constraint computations to finish.
|
||||||
|
common::MutexLocker locker(&mutex_);
|
||||||
|
bool notification GUARDED_BY(mutex_) = false;
|
||||||
constraint_builder_.WhenDone(
|
constraint_builder_.WhenDone(
|
||||||
[this,
|
[this,
|
||||||
¬ification](const constraints::ConstraintBuilder3D::Result& result) {
|
¬ification](const constraints::ConstraintBuilder3D::Result& result)
|
||||||
|
EXCLUDES(mutex_) {
|
||||||
common::MutexLocker locker(&mutex_);
|
common::MutexLocker locker(&mutex_);
|
||||||
data_.constraints.insert(data_.constraints.end(), result.begin(),
|
data_.constraints.insert(data_.constraints.end(), result.begin(),
|
||||||
result.end());
|
result.end());
|
||||||
notification = true;
|
notification = true;
|
||||||
});
|
});
|
||||||
locker.Await([¬ification]() { return notification; });
|
while (!locker.AwaitWithTimeout([¬ification]()
|
||||||
|
REQUIRES(mutex_) { return notification; },
|
||||||
|
common::FromSeconds(1.))) {
|
||||||
|
report_progress();
|
||||||
|
}
|
||||||
|
CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes);
|
||||||
|
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
|
void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
|
||||||
|
|
|
@ -162,7 +162,7 @@ class PoseGraph3D : public PoseGraph {
|
||||||
|
|
||||||
// Handles a new work item.
|
// Handles a new work item.
|
||||||
void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
|
void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
|
||||||
EXCLUDES(mutex_);
|
EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
|
||||||
|
|
||||||
// Adds connectivity and sampler for a trajectory if it does not exist.
|
// Adds connectivity and sampler for a trajectory if it does not exist.
|
||||||
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
|
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
|
||||||
|
@ -197,7 +197,7 @@ class PoseGraph3D : public PoseGraph {
|
||||||
|
|
||||||
// Runs the optimization, executes the trimmers and processes the work queue.
|
// Runs the optimization, executes the trimmers and processes the work queue.
|
||||||
void HandleWorkQueue(const constraints::ConstraintBuilder3D::Result& result)
|
void HandleWorkQueue(const constraints::ConstraintBuilder3D::Result& result)
|
||||||
EXCLUDES(mutex_);
|
EXCLUDES(mutex_) EXCLUDES(work_queue_mutex_);
|
||||||
|
|
||||||
// Runs the optimization. Callers have to make sure, that there is only one
|
// Runs the optimization. Callers have to make sure, that there is only one
|
||||||
// optimization being run at a time.
|
// optimization being run at a time.
|
||||||
|
@ -229,10 +229,11 @@ class PoseGraph3D : public PoseGraph {
|
||||||
const proto::PoseGraphOptions options_;
|
const proto::PoseGraphOptions options_;
|
||||||
GlobalSlamOptimizationCallback global_slam_optimization_callback_;
|
GlobalSlamOptimizationCallback global_slam_optimization_callback_;
|
||||||
mutable common::Mutex mutex_;
|
mutable common::Mutex mutex_;
|
||||||
|
common::Mutex work_queue_mutex_;
|
||||||
|
|
||||||
// If it exists, further work items must be added to this queue, and will be
|
// If it exists, further work items must be added to this queue, and will be
|
||||||
// considered later.
|
// considered later.
|
||||||
std::unique_ptr<WorkQueue> work_queue_ GUARDED_BY(mutex_);
|
std::unique_ptr<WorkQueue> work_queue_ GUARDED_BY(work_queue_mutex_);
|
||||||
|
|
||||||
// We globally localize a fraction of the nodes from each trajectory.
|
// We globally localize a fraction of the nodes from each trajectory.
|
||||||
std::unordered_map<int, std::unique_ptr<common::FixedRatioSampler>>
|
std::unordered_map<int, std::unique_ptr<common::FixedRatioSampler>>
|
||||||
|
|
Loading…
Reference in New Issue