Take PoseGraph3D mutex inside instead of outside work item (#1264)

Take PoseGraph3D mutex inside instead of outside work item
    
This refactors the code but does not alter behavior except
that the shared mutex is separately taken for work_queue_ access,
released, and then re-acquired inside each work item.
    
As a consequence AddWorkItem() also needs to EXCLUDE() the lock and
acquire it internally, because for the case where we run the task
in the foreground we cannot hold the lock and recursively acquire it again
inside the task being run.
    
This prepares for making locking more fine-grained, see issue #1250.
master
danielsievers 2018-07-13 11:53:25 +02:00 committed by Wally B. Feed
parent 61a89d8ab8
commit 8c9104568a
2 changed files with 161 additions and 109 deletions

View File

@ -104,13 +104,11 @@ std::vector<SubmapId> PoseGraph3D::InitializeGlobalSubmapPoses(
return {front_submap_id, last_submap_id};
}
NodeId PoseGraph3D::AddNode(
NodeId PoseGraph3D::AppendNode(
std::shared_ptr<const TrajectoryNode::Data> constant_data,
const int trajectory_id,
const std::vector<std::shared_ptr<const Submap3D>>& insertion_submaps) {
const transform::Rigid3d optimized_pose(
GetLocalToGlobalTransform(trajectory_id) * constant_data->local_pose);
const std::vector<std::shared_ptr<const Submap3D>>& insertion_submaps,
const transform::Rigid3d& optimized_pose) {
common::MutexLocker locker(&mutex_);
AddTrajectoryIfNeeded(trajectory_id);
if (!CanAddWorkItemModifying(trajectory_id)) {
@ -119,7 +117,6 @@ NodeId PoseGraph3D::AddNode(
const NodeId node_id = data_.trajectory_nodes.Append(
trajectory_id, TrajectoryNode{constant_data, optimized_pose});
++data_.num_trajectory_nodes;
// Test if the 'insertion_submap.back()' is one we never saw before.
if (data_.submap_data.SizeOfTrajectoryOrZero(trajectory_id) == 0 ||
std::prev(data_.submap_data.EndOfTrajectory(trajectory_id))
@ -131,11 +128,22 @@ NodeId PoseGraph3D::AddNode(
data_.submap_data.at(submap_id).submap = insertion_submaps.back();
LOG(INFO) << "Inserted submap " << submap_id << ".";
}
return node_id;
}
NodeId PoseGraph3D::AddNode(
std::shared_ptr<const TrajectoryNode::Data> constant_data,
const int trajectory_id,
const std::vector<std::shared_ptr<const Submap3D>>& insertion_submaps) {
const transform::Rigid3d optimized_pose(
GetLocalToGlobalTransform(trajectory_id) * constant_data->local_pose);
const NodeId node_id = AppendNode(constant_data, trajectory_id,
insertion_submaps, optimized_pose);
// We have to check this here, because it might have changed by the time we
// execute the lambda.
const bool newly_finished_submap = insertion_submaps.front()->finished();
AddWorkItem([=]() REQUIRES(mutex_) {
AddWorkItem([=]() EXCLUDES(mutex_) {
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
@ -144,13 +152,21 @@ NodeId PoseGraph3D::AddNode(
void PoseGraph3D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item) {
{
common::MutexLocker locker(&mutex_);
if (work_queue_ != nullptr) {
const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item});
kWorkQueueDelayMetric->Set(
common::ToSeconds(now - work_queue_->front().time));
} else if (work_item() == WorkItem::Result::kRunOptimization) {
return;
}
}
if (work_item() == WorkItem::Result::kRunOptimization) {
{
common::MutexLocker locker(&mutex_);
work_queue_ = common::make_unique<WorkQueue>();
}
constraint_builder_.WhenDone(
[this](const constraints::ConstraintBuilder3D::Result& result) {
HandleWorkQueue(result);
@ -177,20 +193,22 @@ void PoseGraph3D::AddTrajectoryIfNeeded(const int trajectory_id) {
void PoseGraph3D::AddImuData(const int trajectory_id,
const sensor::ImuData& imu_data) {
AddWorkItem([=]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph3D::AddOdometryData(const int trajectory_id,
const sensor::OdometryData& odometry_data) {
AddWorkItem([=]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddOdometryData(trajectory_id, odometry_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -198,20 +216,21 @@ void PoseGraph3D::AddOdometryData(const int trajectory_id,
void PoseGraph3D::AddFixedFramePoseData(
const int trajectory_id,
const sensor::FixedFramePoseData& fixed_frame_pose_data) {
AddWorkItem([=]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddFixedFramePoseData(trajectory_id,
fixed_frame_pose_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph3D::AddLandmarkData(int trajectory_id,
const sensor::LandmarkData& landmark_data) {
AddWorkItem([=]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(trajectory_id)) {
for (const auto& observation : landmark_data.landmark_observations) {
data_.landmark_nodes[observation.id].landmark_observations.emplace_back(
PoseGraphInterface::LandmarkNode::LandmarkObservation{
@ -219,6 +238,7 @@ void PoseGraph3D::AddLandmarkData(int trajectory_id,
observation.landmark_to_tracking_transform,
observation.translation_weight, observation.rotation_weight});
}
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -294,6 +314,7 @@ WorkItem::Result PoseGraph3D::ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap3D>> insertion_submaps,
const bool newly_finished_submap) {
common::MutexLocker locker(&mutex_);
const auto& constant_data = data_.trajectory_nodes.at(node_id).constant_data;
const std::vector<SubmapId> submap_ids = InitializeGlobalSubmapPoses(
node_id.trajectory_id, constant_data->time, insertion_submaps);
@ -424,6 +445,7 @@ void PoseGraph3D::HandleWorkQueue(
trajectory_id_to_last_optimized_node_id);
}
{
common::MutexLocker locker(&mutex_);
for (const Constraint& constraint : result) {
UpdateTrajectoryConnectivity(constraint);
@ -441,17 +463,25 @@ void PoseGraph3D::HandleWorkQueue(
trimmers_.end());
num_nodes_since_last_loop_closure_ = 0;
}
size_t work_queue_size;
bool process_work_queue = true;
while (process_work_queue) {
std::function<WorkItem::Result()> work_item;
{
common::MutexLocker locker(&mutex_);
if (work_queue_->empty()) {
work_queue_.reset();
return;
}
process_work_queue =
work_queue_->front().task() == WorkItem::Result::kDoNotRunOptimization;
work_item = work_queue_->front().task;
work_queue_->pop_front();
work_queue_size = work_queue_->size();
}
LOG(INFO) << "Remaining work items in queue: " << work_queue_->size();
process_work_queue = work_item() == WorkItem::Result::kDoNotRunOptimization;
}
LOG(INFO) << "Remaining work items in queue: " << work_queue_size;
// We have to optimize again.
constraint_builder_.WhenDone(
[this](const constraints::ConstraintBuilder3D::Result& result) {
@ -497,6 +527,7 @@ void PoseGraph3D::WaitForAllComputations() {
}
void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
{
common::MutexLocker locker(&mutex_);
auto it = data_.trajectories_state.find(trajectory_id);
if (it == data_.trajectories_state.end()) {
@ -506,7 +537,9 @@ void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
}
it->second.deletion_state =
InternalTrajectoryState::DeletionState::SCHEDULED_FOR_DELETION;
AddWorkItem([this, trajectory_id]() REQUIRES(mutex_) {
}
AddWorkItem([this, trajectory_id]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
TrajectoryState::ACTIVE);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
@ -520,8 +553,8 @@ void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
}
void PoseGraph3D::FinishTrajectory(const int trajectory_id) {
AddWorkItem([this, trajectory_id]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
AddWorkItem([this, trajectory_id]() REQUIRES(mutex_) {
CHECK(!IsTrajectoryFinished(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FINISHED;
@ -539,9 +572,12 @@ bool PoseGraph3D::IsTrajectoryFinished(const int trajectory_id) const {
}
void PoseGraph3D::FreezeTrajectory(const int trajectory_id) {
{
common::MutexLocker locker(&mutex_);
data_.trajectory_connectivity_state.Add(trajectory_id);
AddWorkItem([this, trajectory_id]() REQUIRES(mutex_) {
}
AddWorkItem([this, trajectory_id]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
CHECK(!IsTrajectoryFrozen(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FROZEN;
return WorkItem::Result::kDoNotRunOptimization;
@ -565,6 +601,7 @@ void PoseGraph3D::AddSubmapFromProto(
std::shared_ptr<const Submap3D> submap_ptr =
std::make_shared<const Submap3D>(submap.submap_3d());
{
common::MutexLocker locker(&mutex_);
AddTrajectoryIfNeeded(submap_id.trajectory_id);
if (!CanAddWorkItemModifying(submap_id.trajectory_id)) return;
@ -573,7 +610,9 @@ void PoseGraph3D::AddSubmapFromProto(
// Immediately show the submap at the 'global_submap_pose'.
data_.global_submap_poses_3d.Insert(
submap_id, optimization::SubmapSpec3D{global_submap_pose});
AddWorkItem([this, submap_id, global_submap_pose]() REQUIRES(mutex_) {
}
AddWorkItem([this, submap_id, global_submap_pose]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
data_.submap_data.at(submap_id).state = SubmapState::kFinished;
optimization_problem_->InsertSubmap(submap_id, global_submap_pose);
return WorkItem::Result::kDoNotRunOptimization;
@ -587,13 +626,16 @@ void PoseGraph3D::AddNodeFromProto(const transform::Rigid3d& global_pose,
std::shared_ptr<const TrajectoryNode::Data> constant_data =
std::make_shared<const TrajectoryNode::Data>(FromProto(node.node_data()));
{
common::MutexLocker locker(&mutex_);
AddTrajectoryIfNeeded(node_id.trajectory_id);
if (!CanAddWorkItemModifying(node_id.trajectory_id)) return;
data_.trajectory_nodes.Insert(node_id,
TrajectoryNode{constant_data, global_pose});
}
AddWorkItem([this, node_id, global_pose]() REQUIRES(mutex_) {
AddWorkItem([this, node_id, global_pose]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
const auto& constant_data =
data_.trajectory_nodes.at(node_id).constant_data;
optimization_problem_->InsertTrajectoryNode(
@ -617,28 +659,30 @@ void PoseGraph3D::SetTrajectoryDataFromProto(
}
const int trajectory_id = data.trajectory_id();
AddWorkItem([this, trajectory_id, trajectory_data]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([this, trajectory_id, trajectory_data]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->SetTrajectoryData(trajectory_id, trajectory_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph3D::AddNodeToSubmap(const NodeId& node_id,
const SubmapId& submap_id) {
AddWorkItem([this, node_id, submap_id]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
if (!CanAddWorkItemModifying(submap_id.trajectory_id)) return;
AddWorkItem([this, node_id, submap_id]() REQUIRES(mutex_) {
if (CanAddWorkItemModifying(submap_id.trajectory_id)) {
data_.submap_data.at(submap_id).node_ids.insert(node_id);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph3D::AddSerializedConstraints(
const std::vector<Constraint>& constraints) {
AddWorkItem([this, constraints]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
AddWorkItem([this, constraints]() REQUIRES(mutex_) {
for (const auto& constraint : constraints) {
CHECK(data_.trajectory_nodes.Contains(constraint.node_id));
CHECK(data_.submap_data.Contains(constraint.submap_id));
@ -663,10 +707,10 @@ void PoseGraph3D::AddSerializedConstraints(
}
void PoseGraph3D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer) {
common::MutexLocker locker(&mutex_);
// C++11 does not allow us to move a unique_ptr into a lambda.
PoseGraphTrimmer* const trimmer_ptr = trimmer.release();
AddWorkItem([this, trimmer_ptr]() REQUIRES(mutex_) {
AddWorkItem([this, trimmer_ptr]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
@ -674,13 +718,14 @@ void PoseGraph3D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer) {
void PoseGraph3D::RunFinalOptimization() {
{
AddWorkItem([this]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
AddWorkItem([this]() REQUIRES(mutex_) {
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
AddWorkItem([this]() REQUIRES(mutex_) {
AddWorkItem([this]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
@ -847,8 +892,8 @@ std::map<std::string, transform::Rigid3d> PoseGraph3D::GetLandmarkPoses()
void PoseGraph3D::SetLandmarkPose(const std::string& landmark_id,
const transform::Rigid3d& global_pose) {
AddWorkItem([=]() EXCLUDES(mutex_) {
common::MutexLocker locker(&mutex_);
AddWorkItem([=]() REQUIRES(mutex_) {
data_.landmark_nodes[landmark_id].global_landmark_pose = global_pose;
return WorkItem::Result::kDoNotRunOptimization;
});

View File

@ -162,11 +162,18 @@ class PoseGraph3D : public PoseGraph {
// Handles a new work item.
void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
REQUIRES(mutex_);
EXCLUDES(mutex_);
// Adds connectivity and sampler for a trajectory if it does not exist.
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
// Appends the new node and submap (if needed) to the internal data stuctures.
NodeId AppendNode(
std::shared_ptr<const TrajectoryNode::Data> constant_data,
int trajectory_id,
const std::vector<std::shared_ptr<const Submap3D>>& insertion_submaps,
const transform::Rigid3d& optimized_pose) EXCLUDES(mutex_);
// Grows the optimization problem to have an entry for every element of
// 'insertion_submaps'. Returns the IDs for the 'insertion_submaps'.
std::vector<SubmapId> InitializeGlobalSubmapPoses(
@ -178,7 +185,7 @@ class PoseGraph3D : public PoseGraph {
WorkItem::Result ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap3D>> insertion_submaps,
bool newly_finished_submap) REQUIRES(mutex_);
bool newly_finished_submap) EXCLUDES(mutex_);
// Computes constraints for a node and submap pair.
void ComputeConstraint(const NodeId& node_id, const SubmapId& submap_id)
@ -194,7 +201,7 @@ class PoseGraph3D : public PoseGraph {
// Runs the optimization, executes the trimmers and processes the work queue.
void HandleWorkQueue(const constraints::ConstraintBuilder3D::Result& result)
REQUIRES(mutex_);
EXCLUDES(mutex_);
// Runs the optimization. Callers have to make sure, that there is only one
// optimization being run at a time.
@ -240,7 +247,7 @@ class PoseGraph3D : public PoseGraph {
// Current optimization problem.
std::unique_ptr<optimization::OptimizationProblem3D> optimization_problem_;
constraints::ConstraintBuilder3D constraint_builder_ GUARDED_BY(mutex_);
constraints::ConstraintBuilder3D constraint_builder_;
// List of all trimmers to consult when optimizations finish.
std::vector<std::unique_ptr<PoseGraphTrimmer>> trimmers_ GUARDED_BY(mutex_);