Refactor PoseGraph work queue optimization dispatching (#1252)

Unwrap the logic to dispatch optimization and the deferred
logic to create the work queue (when kicked off in foreground)
to happen in AddWorkItem() and HandleWorkQueue().
The work items will instead return whether they need optimization
to be dispatched or not.
master
danielsievers 2018-07-11 17:33:41 +02:00 committed by Wally B. Feed
parent bb380ae947
commit 9f1039221c
5 changed files with 80 additions and 68 deletions

View File

@ -138,15 +138,20 @@ NodeId PoseGraph2D::AddNode(
// execute the lambda.
const bool newly_finished_submap = insertion_submaps.front()->finished();
AddWorkItem([=]() REQUIRES(mutex_) {
ComputeConstraintsForNode(node_id, insertion_submaps,
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
return node_id;
}
void PoseGraph2D::AddWorkItem(const std::function<void()>& work_item) {
void PoseGraph2D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item) {
if (work_queue_ == nullptr) {
work_item();
if (work_item() == WorkItem::Result::kRunOptimization) {
work_queue_ = common::make_unique<WorkQueue>();
constraint_builder_.WhenDone(std::bind(&PoseGraph2D::HandleWorkQueue,
this, std::placeholders::_1));
}
} else {
const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item});
@ -178,6 +183,7 @@ void PoseGraph2D::AddImuData(const int trajectory_id,
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -187,6 +193,7 @@ void PoseGraph2D::AddOdometryData(const int trajectory_id,
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
optimization_problem_->AddOdometryData(trajectory_id, odometry_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -208,6 +215,7 @@ void PoseGraph2D::AddLandmarkData(int trajectory_id,
observation.landmark_to_tracking_transform,
observation.translation_weight, observation.rotation_weight});
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -258,7 +266,7 @@ void PoseGraph2D::ComputeConstraintsForOldNodes(const SubmapId& submap_id) {
}
}
void PoseGraph2D::ComputeConstraintsForNode(
WorkItem::Result PoseGraph2D::ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps,
const bool newly_finished_submap) {
@ -318,22 +326,13 @@ void PoseGraph2D::ComputeConstraintsForNode(
}
constraint_builder_.NotifyEndOfNode();
++num_nodes_since_last_loop_closure_;
CHECK(!run_loop_closure_);
if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes()) {
DispatchOptimization();
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;
}
void PoseGraph2D::DispatchOptimization() {
run_loop_closure_ = true;
// If there is a 'work_queue_' already, some other thread will take care.
if (work_queue_ == nullptr) {
work_queue_ = common::make_unique<WorkQueue>();
constraint_builder_.WhenDone(
std::bind(&PoseGraph2D::HandleWorkQueue, this, std::placeholders::_1));
}
}
common::Time PoseGraph2D::GetLatestNodeTime(const NodeId& node_id,
const SubmapId& submap_id) const {
common::Time time = data_.trajectory_nodes.at(node_id).constant_data->time;
@ -425,13 +424,14 @@ void PoseGraph2D::HandleWorkQueue(
trimmers_.end());
num_nodes_since_last_loop_closure_ = 0;
run_loop_closure_ = false;
while (!run_loop_closure_) {
bool process_work_queue = true;
while (process_work_queue) {
if (work_queue_->empty()) {
work_queue_.reset();
return;
}
work_queue_->front().task();
process_work_queue =
work_queue_->front().task() == WorkItem::Result::kDoNotRunOptimization;
work_queue_->pop_front();
}
LOG(INFO) << "Remaining work items in queue: " << work_queue_->size();
@ -496,6 +496,7 @@ void PoseGraph2D::DeleteTrajectory(const int trajectory_id) {
InternalTrajectoryState::DeletionState::SCHEDULED_FOR_DELETION);
data_.trajectories_state.at(trajectory_id).deletion_state =
InternalTrajectoryState::DeletionState::WAIT_FOR_DELETION;
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -508,8 +509,7 @@ void PoseGraph2D::FinishTrajectory(const int trajectory_id) {
for (const auto& submap : data_.submap_data.trajectory(trajectory_id)) {
data_.submap_data.at(submap.id).state = SubmapState::kFinished;
}
CHECK(!run_loop_closure_);
DispatchOptimization();
return WorkItem::Result::kRunOptimization;
});
}
@ -525,6 +525,7 @@ void PoseGraph2D::FreezeTrajectory(const int trajectory_id) {
AddWorkItem([this, trajectory_id]() REQUIRES(mutex_) {
CHECK(!IsTrajectoryFrozen(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FROZEN;
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -558,6 +559,7 @@ void PoseGraph2D::AddSubmapFromProto(
AddWorkItem([this, submap_id, global_submap_pose_2d]() REQUIRES(mutex_) {
data_.submap_data.at(submap_id).state = SubmapState::kFinished;
optimization_problem_->InsertSubmap(submap_id, global_submap_pose_2d);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -587,6 +589,7 @@ void PoseGraph2D::AddNodeFromProto(const transform::Rigid3d& global_pose,
gravity_alignment_inverse),
transform::Project2D(global_pose * gravity_alignment_inverse),
constant_data->gravity_alignment});
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -601,6 +604,7 @@ void PoseGraph2D::AddNodeToSubmap(const NodeId& node_id,
if (!CanAddWorkItemModifying(submap_id.trajectory_id)) return;
AddWorkItem([this, node_id, submap_id]() REQUIRES(mutex_) {
data_.submap_data.at(submap_id).node_ids.insert(node_id);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -634,6 +638,7 @@ void PoseGraph2D::AddSerializedConstraints(
constraint.submap_id, constraint.node_id, pose, constraint.tag});
}
LOG(INFO) << "Loaded " << constraints.size() << " constraints.";
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -641,8 +646,10 @@ void PoseGraph2D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer) {
common::MutexLocker locker(&mutex_);
// C++11 does not allow us to move a unique_ptr into a lambda.
PoseGraphTrimmer* const trimmer_ptr = trimmer.release();
AddWorkItem([this, trimmer_ptr]()
REQUIRES(mutex_) { trimmers_.emplace_back(trimmer_ptr); });
AddWorkItem([this, trimmer_ptr]() REQUIRES(mutex_) {
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph2D::RunFinalOptimization() {
@ -651,13 +658,14 @@ void PoseGraph2D::RunFinalOptimization() {
AddWorkItem([this]() REQUIRES(mutex_) {
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
DispatchOptimization();
return WorkItem::Result::kRunOptimization;
});
AddWorkItem([this]() REQUIRES(mutex_) {
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
WaitForAllComputations();
@ -795,6 +803,7 @@ void PoseGraph2D::SetLandmarkPose(const std::string& landmark_id,
common::MutexLocker locker(&mutex_);
AddWorkItem([=]() REQUIRES(mutex_) {
data_.landmark_nodes[landmark_id].global_landmark_pose = global_pose;
return WorkItem::Result::kDoNotRunOptimization;
});
}

View File

@ -158,7 +158,8 @@ class PoseGraph2D : public PoseGraph {
const REQUIRES(mutex_);
// Handles a new work item.
void AddWorkItem(const std::function<void()>& work_item) REQUIRES(mutex_);
void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
REQUIRES(mutex_);
// Adds connectivity and sampler for a trajectory if it does not exist.
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
@ -171,7 +172,7 @@ class PoseGraph2D : public PoseGraph {
REQUIRES(mutex_);
// Adds constraints for a node, and starts scan matching in the background.
void ComputeConstraintsForNode(
WorkItem::Result ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps,
bool newly_finished_submap) REQUIRES(mutex_);
@ -234,12 +235,6 @@ class PoseGraph2D : public PoseGraph {
// Number of nodes added since last loop closure.
int num_nodes_since_last_loop_closure_ GUARDED_BY(mutex_) = 0;
// Whether the optimization has to be run before more data is added.
bool run_loop_closure_ GUARDED_BY(mutex_) = false;
// Schedules optimization (i.e. loop closure) to run.
void DispatchOptimization() REQUIRES(mutex_);
// Current optimization problem.
std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem_;
constraints::ConstraintBuilder2D constraint_builder_ GUARDED_BY(mutex_);

View File

@ -136,20 +136,23 @@ NodeId PoseGraph3D::AddNode(
// execute the lambda.
const bool newly_finished_submap = insertion_submaps.front()->finished();
AddWorkItem([=]() REQUIRES(mutex_) {
ComputeConstraintsForNode(node_id, insertion_submaps,
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
return node_id;
}
void PoseGraph3D::AddWorkItem(const std::function<void()>& work_item) {
if (work_queue_ == nullptr) {
work_item();
} else {
void PoseGraph3D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item) {
if (work_queue_ != nullptr) {
const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item});
kWorkQueueDelayMetric->Set(
common::ToSeconds(now - work_queue_->front().time));
} else if (work_item() == WorkItem::Result::kRunOptimization) {
work_queue_ = common::make_unique<WorkQueue>();
constraint_builder_.WhenDone(
std::bind(&PoseGraph3D::HandleWorkQueue, this, std::placeholders::_1));
}
}
@ -176,6 +179,7 @@ void PoseGraph3D::AddImuData(const int trajectory_id,
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -185,6 +189,7 @@ void PoseGraph3D::AddOdometryData(const int trajectory_id,
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([=]() REQUIRES(mutex_) {
optimization_problem_->AddOdometryData(trajectory_id, odometry_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -196,6 +201,7 @@ void PoseGraph3D::AddFixedFramePoseData(
AddWorkItem([=]() REQUIRES(mutex_) {
optimization_problem_->AddFixedFramePoseData(trajectory_id,
fixed_frame_pose_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -211,6 +217,7 @@ void PoseGraph3D::AddLandmarkData(int trajectory_id,
observation.landmark_to_tracking_transform,
observation.translation_weight, observation.rotation_weight});
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -281,7 +288,7 @@ void PoseGraph3D::ComputeConstraintsForOldNodes(const SubmapId& submap_id) {
}
}
void PoseGraph3D::ComputeConstraintsForNode(
WorkItem::Result PoseGraph3D::ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap3D>> insertion_submaps,
const bool newly_finished_submap) {
@ -334,21 +341,11 @@ void PoseGraph3D::ComputeConstraintsForNode(
}
constraint_builder_.NotifyEndOfNode();
++num_nodes_since_last_loop_closure_;
CHECK(!run_loop_closure_);
if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes()) {
DispatchOptimization();
}
}
void PoseGraph3D::DispatchOptimization() {
run_loop_closure_ = true;
// If there is a 'work_queue_' already, some other thread will take care.
if (work_queue_ == nullptr) {
work_queue_ = common::make_unique<WorkQueue>();
constraint_builder_.WhenDone(
std::bind(&PoseGraph3D::HandleWorkQueue, this, std::placeholders::_1));
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;
}
common::Time PoseGraph3D::GetLatestNodeTime(const NodeId& node_id,
@ -442,13 +439,14 @@ void PoseGraph3D::HandleWorkQueue(
trimmers_.end());
num_nodes_since_last_loop_closure_ = 0;
run_loop_closure_ = false;
while (!run_loop_closure_) {
bool process_work_queue = true;
while (process_work_queue) {
if (work_queue_->empty()) {
work_queue_.reset();
return;
}
work_queue_->front().task();
process_work_queue =
work_queue_->front().task() == WorkItem::Result::kDoNotRunOptimization;
work_queue_->pop_front();
}
LOG(INFO) << "Remaining work items in queue: " << work_queue_->size();
@ -513,6 +511,7 @@ void PoseGraph3D::DeleteTrajectory(const int trajectory_id) {
InternalTrajectoryState::DeletionState::SCHEDULED_FOR_DELETION);
data_.trajectories_state.at(trajectory_id).deletion_state =
InternalTrajectoryState::DeletionState::WAIT_FOR_DELETION;
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -525,8 +524,7 @@ void PoseGraph3D::FinishTrajectory(const int trajectory_id) {
for (const auto& submap : data_.submap_data.trajectory(trajectory_id)) {
data_.submap_data.at(submap.id).state = SubmapState::kFinished;
}
CHECK(!run_loop_closure_);
DispatchOptimization();
return WorkItem::Result::kRunOptimization;
});
}
@ -542,6 +540,7 @@ void PoseGraph3D::FreezeTrajectory(const int trajectory_id) {
AddWorkItem([this, trajectory_id]() REQUIRES(mutex_) {
CHECK(!IsTrajectoryFrozen(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FROZEN;
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -573,6 +572,7 @@ void PoseGraph3D::AddSubmapFromProto(
AddWorkItem([this, submap_id, global_submap_pose]() REQUIRES(mutex_) {
data_.submap_data.at(submap_id).state = SubmapState::kFinished;
optimization_problem_->InsertSubmap(submap_id, global_submap_pose);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -596,6 +596,7 @@ void PoseGraph3D::AddNodeFromProto(const transform::Rigid3d& global_pose,
node_id,
optimization::NodeSpec3D{constant_data->time, constant_data->local_pose,
global_pose});
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -616,6 +617,7 @@ void PoseGraph3D::SetTrajectoryDataFromProto(
if (!CanAddWorkItemModifying(trajectory_id)) return;
AddWorkItem([this, trajectory_id, trajectory_data]() REQUIRES(mutex_) {
optimization_problem_->SetTrajectoryData(trajectory_id, trajectory_data);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -625,6 +627,7 @@ void PoseGraph3D::AddNodeToSubmap(const NodeId& node_id,
if (!CanAddWorkItemModifying(submap_id.trajectory_id)) return;
AddWorkItem([this, node_id, submap_id]() REQUIRES(mutex_) {
data_.submap_data.at(submap_id).node_ids.insert(node_id);
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -651,6 +654,7 @@ void PoseGraph3D::AddSerializedConstraints(
data_.constraints.push_back(constraint);
}
LOG(INFO) << "Loaded " << constraints.size() << " constraints.";
return WorkItem::Result::kDoNotRunOptimization;
});
}
@ -658,8 +662,10 @@ void PoseGraph3D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer) {
common::MutexLocker locker(&mutex_);
// C++11 does not allow us to move a unique_ptr into a lambda.
PoseGraphTrimmer* const trimmer_ptr = trimmer.release();
AddWorkItem([this, trimmer_ptr]()
REQUIRES(mutex_) { trimmers_.emplace_back(trimmer_ptr); });
AddWorkItem([this, trimmer_ptr]() REQUIRES(mutex_) {
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
}
void PoseGraph3D::RunFinalOptimization() {
@ -668,13 +674,14 @@ void PoseGraph3D::RunFinalOptimization() {
AddWorkItem([this]() REQUIRES(mutex_) {
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
DispatchOptimization();
return WorkItem::Result::kRunOptimization;
});
AddWorkItem([this]() REQUIRES(mutex_) {
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
WaitForAllComputations();
@ -839,6 +846,7 @@ void PoseGraph3D::SetLandmarkPose(const std::string& landmark_id,
common::MutexLocker locker(&mutex_);
AddWorkItem([=]() REQUIRES(mutex_) {
data_.landmark_nodes[landmark_id].global_landmark_pose = global_pose;
return WorkItem::Result::kDoNotRunOptimization;
});
}

View File

@ -161,7 +161,8 @@ class PoseGraph3D : public PoseGraph {
MapById<SubmapId, SubmapData> GetSubmapDataUnderLock() const REQUIRES(mutex_);
// Handles a new work item.
void AddWorkItem(const std::function<void()>& work_item) REQUIRES(mutex_);
void AddWorkItem(const std::function<WorkItem::Result()>& work_item)
REQUIRES(mutex_);
// Adds connectivity and sampler for a trajectory if it does not exist.
void AddTrajectoryIfNeeded(int trajectory_id) REQUIRES(mutex_);
@ -174,7 +175,7 @@ class PoseGraph3D : public PoseGraph {
REQUIRES(mutex_);
// Adds constraints for a node, and starts scan matching in the background.
void ComputeConstraintsForNode(
WorkItem::Result ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap3D>> insertion_submaps,
bool newly_finished_submap) REQUIRES(mutex_);
@ -222,9 +223,6 @@ class PoseGraph3D : public PoseGraph {
void UpdateTrajectoryConnectivity(const Constraint& constraint)
REQUIRES(mutex_);
// Schedules optimization (i.e. loop closure) to run.
void DispatchOptimization() REQUIRES(mutex_);
const proto::PoseGraphOptions options_;
GlobalSlamOptimizationCallback global_slam_optimization_callback_;
mutable common::Mutex mutex_;
@ -240,9 +238,6 @@ class PoseGraph3D : public PoseGraph {
// Number of nodes added since last loop closure.
int num_nodes_since_last_loop_closure_ GUARDED_BY(mutex_) = 0;
// Whether the optimization has to be run before more data is added.
bool run_loop_closure_ GUARDED_BY(mutex_) = false;
// Current optimization problem.
std::unique_ptr<optimization::OptimizationProblem3D> optimization_problem_;
constraints::ConstraintBuilder3D constraint_builder_ GUARDED_BY(mutex_);

View File

@ -25,8 +25,13 @@ namespace cartographer {
namespace mapping {
struct WorkItem {
enum class Result {
kDoNotRunOptimization,
kRunOptimization,
};
std::chrono::steady_clock::time_point time;
std::function<void()> task;
std::function<Result()> task;
};
using WorkQueue = std::deque<WorkItem>;