Use Task in ConstraintBuilder2D (#1118)

master
gaschler 2018-04-26 12:33:36 +02:00 committed by Wally B. Feed
parent 666095cb41
commit c46fe073b4
2 changed files with 123 additions and 132 deletions

View File

@ -58,14 +58,17 @@ ConstraintBuilder2D::ConstraintBuilder2D(
common::ThreadPoolInterface* const thread_pool) common::ThreadPoolInterface* const thread_pool)
: options_(options), : options_(options),
thread_pool_(thread_pool), thread_pool_(thread_pool),
finish_node_task_(common::make_unique<common::Task>()),
when_done_task_(common::make_unique<common::Task>()),
sampler_(options.sampling_ratio()), sampler_(options.sampling_ratio()),
ceres_scan_matcher_(options.ceres_scan_matcher_options()) {} ceres_scan_matcher_(options.ceres_scan_matcher_options()) {}
ConstraintBuilder2D::~ConstraintBuilder2D() { ConstraintBuilder2D::~ConstraintBuilder2D() {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
CHECK_EQ(finish_node_task_->GetState(), common::Task::NEW);
CHECK_EQ(when_done_task_->GetState(), common::Task::NEW);
CHECK_EQ(constraints_.size(), 0) << "WhenDone() was not called"; CHECK_EQ(constraints_.size(), 0) << "WhenDone() was not called";
CHECK_EQ(pending_computations_.size(), 0); CHECK_EQ(num_started_nodes_, num_finished_nodes_);
CHECK_EQ(submap_queued_work_items_.size(), 0);
CHECK(when_done_ == nullptr); CHECK(when_done_ == nullptr);
} }
@ -77,94 +80,101 @@ void ConstraintBuilder2D::MaybeAddConstraint(
options_.max_constraint_distance()) { options_.max_constraint_distance()) {
return; return;
} }
if (sampler_.Pulse()) { if (!sampler_.Pulse()) return;
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
if (when_done_) {
LOG(WARNING)
<< "MaybeAddConstraint was called while WhenDone was scheduled.";
}
constraints_.emplace_back(); constraints_.emplace_back();
kQueueLengthMetric->Set(constraints_.size()); kQueueLengthMetric->Set(constraints_.size());
auto* const constraint = &constraints_.back(); auto* const constraint = &constraints_.back();
++pending_computations_[current_computation_]; const auto* scan_matcher =
const int current_computation = current_computation_; DispatchScanMatcherConstruction(submap_id, submap->grid());
ScheduleSubmapScanMatcherConstructionAndQueueWorkItem( auto constraint_task = common::make_unique<common::Task>();
submap_id, submap->grid(), [=]() EXCLUDES(mutex_) { constraint_task->SetWorkItem([=]() EXCLUDES(mutex_) {
ComputeConstraint(submap_id, submap, node_id, ComputeConstraint(submap_id, submap, node_id, false, /* match_full_submap */
false, /* match_full_submap */ constant_data, initial_relative_pose, *scan_matcher,
constant_data, initial_relative_pose, constraint); constraint);
FinishComputation(current_computation);
}); });
} constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
finish_node_task_->AddDependency(constraint_task_handle);
} }
void ConstraintBuilder2D::MaybeAddGlobalConstraint( void ConstraintBuilder2D::MaybeAddGlobalConstraint(
const SubmapId& submap_id, const Submap2D* const submap, const SubmapId& submap_id, const Submap2D* const submap,
const NodeId& node_id, const TrajectoryNode::Data* const constant_data) { const NodeId& node_id, const TrajectoryNode::Data* const constant_data) {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
if (when_done_) {
LOG(WARNING)
<< "MaybeAddGlobalConstraint was called while WhenDone was scheduled.";
}
constraints_.emplace_back(); constraints_.emplace_back();
kQueueLengthMetric->Set(constraints_.size()); kQueueLengthMetric->Set(constraints_.size());
auto* const constraint = &constraints_.back(); auto* const constraint = &constraints_.back();
++pending_computations_[current_computation_]; const auto* scan_matcher =
const int current_computation = current_computation_; DispatchScanMatcherConstruction(submap_id, submap->grid());
ScheduleSubmapScanMatcherConstructionAndQueueWorkItem( auto constraint_task = common::make_unique<common::Task>();
submap_id, submap->grid(), [=]() EXCLUDES(mutex_) { constraint_task->SetWorkItem([=]() EXCLUDES(mutex_) {
ComputeConstraint( ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
submap_id, submap, node_id, true, /* match_full_submap */ constant_data, transform::Rigid2d::Identity(),
constant_data, transform::Rigid2d::Identity(), constraint); *scan_matcher, constraint);
FinishComputation(current_computation);
}); });
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
finish_node_task_->AddDependency(constraint_task_handle);
} }
void ConstraintBuilder2D::NotifyEndOfNode() { void ConstraintBuilder2D::NotifyEndOfNode() {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
++current_computation_; CHECK(finish_node_task_ != nullptr);
finish_node_task_->SetWorkItem([this] {
common::MutexLocker locker(&mutex_);
++num_finished_nodes_;
});
auto finish_node_task_handle =
thread_pool_->Schedule(std::move(finish_node_task_));
finish_node_task_ = common::make_unique<common::Task>();
when_done_task_->AddDependency(finish_node_task_handle);
++num_started_nodes_;
} }
void ConstraintBuilder2D::WhenDone( void ConstraintBuilder2D::WhenDone(
const std::function<void(const ConstraintBuilder2D::Result&)>& callback) { const std::function<void(const ConstraintBuilder2D::Result&)>& callback) {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
CHECK(when_done_ == nullptr); CHECK(when_done_ == nullptr);
// TODO(gaschler): Consider using just std::function, it can also be empty.
when_done_ = when_done_ =
common::make_unique<std::function<void(const Result&)>>(callback); common::make_unique<std::function<void(const Result&)>>(callback);
++pending_computations_[current_computation_]; CHECK(when_done_task_ != nullptr);
const int current_computation = current_computation_; when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule( thread_pool_->Schedule(std::move(when_done_task_));
[this, current_computation] { FinishComputation(current_computation); }); when_done_task_ = common::make_unique<common::Task>();
}
void ConstraintBuilder2D::ScheduleSubmapScanMatcherConstructionAndQueueWorkItem(
const SubmapId& submap_id, const Grid2D* const grid,
const std::function<void()>& work_item) {
if (submap_scan_matchers_[submap_id].fast_correlative_scan_matcher !=
nullptr) {
thread_pool_->Schedule(work_item);
} else {
submap_queued_work_items_[submap_id].push_back(work_item);
if (submap_queued_work_items_[submap_id].size() == 1) {
thread_pool_->Schedule(
[=]() { ConstructSubmapScanMatcher(submap_id, grid); });
}
}
}
void ConstraintBuilder2D::ConstructSubmapScanMatcher(const SubmapId& submap_id,
const Grid2D* const grid) {
auto submap_scan_matcher =
common::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*grid, options_.fast_correlative_scan_matcher_options());
common::MutexLocker locker(&mutex_);
submap_scan_matchers_[submap_id] = {grid, std::move(submap_scan_matcher)};
for (const std::function<void()>& work_item :
submap_queued_work_items_[submap_id]) {
thread_pool_->Schedule(work_item);
}
submap_queued_work_items_.erase(submap_id);
} }
const ConstraintBuilder2D::SubmapScanMatcher* const ConstraintBuilder2D::SubmapScanMatcher*
ConstraintBuilder2D::GetSubmapScanMatcher(const SubmapId& submap_id) { ConstraintBuilder2D::DispatchScanMatcherConstruction(const SubmapId& submap_id,
common::MutexLocker locker(&mutex_); const Grid2D* const grid) {
const SubmapScanMatcher* submap_scan_matcher = if (submap_scan_matchers_.count(submap_id) != 0) {
&submap_scan_matchers_[submap_id]; return &submap_scan_matchers_.at(submap_id);
CHECK(submap_scan_matcher->fast_correlative_scan_matcher != nullptr); }
return submap_scan_matcher; auto& submap_scan_matcher = submap_scan_matchers_[submap_id];
submap_scan_matcher.grid = grid;
auto& scan_matcher_options = options_.fast_correlative_scan_matcher_options();
auto scan_matcher_task = common::make_unique<common::Task>();
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]() {
submap_scan_matcher.fast_correlative_scan_matcher =
common::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
});
submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
return &submap_scan_matchers_.at(submap_id);
} }
void ConstraintBuilder2D::ComputeConstraint( void ConstraintBuilder2D::ComputeConstraint(
@ -172,11 +182,10 @@ void ConstraintBuilder2D::ComputeConstraint(
const NodeId& node_id, bool match_full_submap, const NodeId& node_id, bool match_full_submap,
const TrajectoryNode::Data* const constant_data, const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose, const transform::Rigid2d& initial_relative_pose,
const SubmapScanMatcher& submap_scan_matcher,
std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint) { std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint) {
const transform::Rigid2d initial_pose = const transform::Rigid2d initial_pose =
ComputeSubmapPose(*submap) * initial_relative_pose; ComputeSubmapPose(*submap) * initial_relative_pose;
const SubmapScanMatcher* const submap_scan_matcher =
GetSubmapScanMatcher(submap_id);
// The 'constraint_transform' (submap i <- node j) is computed from: // The 'constraint_transform' (submap i <- node j) is computed from:
// - a 'filtered_gravity_aligned_point_cloud' in node j, // - a 'filtered_gravity_aligned_point_cloud' in node j,
@ -192,7 +201,7 @@ void ConstraintBuilder2D::ComputeConstraint(
// 3. Refine. // 3. Refine.
if (match_full_submap) { if (match_full_submap) {
kGlobalConstraintsSearchedMetric->Increment(); kGlobalConstraintsSearchedMetric->Increment();
if (submap_scan_matcher->fast_correlative_scan_matcher->MatchFullSubmap( if (submap_scan_matcher.fast_correlative_scan_matcher->MatchFullSubmap(
constant_data->filtered_gravity_aligned_point_cloud, constant_data->filtered_gravity_aligned_point_cloud,
options_.global_localization_min_score(), &score, &pose_estimate)) { options_.global_localization_min_score(), &score, &pose_estimate)) {
CHECK_GT(score, options_.global_localization_min_score()); CHECK_GT(score, options_.global_localization_min_score());
@ -205,7 +214,7 @@ void ConstraintBuilder2D::ComputeConstraint(
} }
} else { } else {
kConstraintsSearchedMetric->Increment(); kConstraintsSearchedMetric->Increment();
if (submap_scan_matcher->fast_correlative_scan_matcher->Match( if (submap_scan_matcher.fast_correlative_scan_matcher->Match(
initial_pose, constant_data->filtered_gravity_aligned_point_cloud, initial_pose, constant_data->filtered_gravity_aligned_point_cloud,
options_.min_score(), &score, &pose_estimate)) { options_.min_score(), &score, &pose_estimate)) {
// We've reported a successful local match. // We've reported a successful local match.
@ -227,7 +236,7 @@ void ConstraintBuilder2D::ComputeConstraint(
ceres::Solver::Summary unused_summary; ceres::Solver::Summary unused_summary;
ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate, ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate,
constant_data->filtered_gravity_aligned_point_cloud, constant_data->filtered_gravity_aligned_point_cloud,
*submap_scan_matcher->grid, &pose_estimate, *submap_scan_matcher.grid, &pose_estimate,
&unused_summary); &unused_summary);
const transform::Rigid2d constraint_transform = const transform::Rigid2d constraint_transform =
@ -258,22 +267,16 @@ void ConstraintBuilder2D::ComputeConstraint(
} }
} }
void ConstraintBuilder2D::FinishComputation(const int computation_index) { void ConstraintBuilder2D::RunWhenDoneCallback() {
Result result; Result result;
std::unique_ptr<std::function<void(const Result&)>> callback; std::unique_ptr<std::function<void(const Result&)>> callback;
{ {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
if (--pending_computations_[computation_index] == 0) { CHECK(when_done_ != nullptr);
pending_computations_.erase(computation_index);
}
if (pending_computations_.empty()) {
CHECK_EQ(submap_queued_work_items_.size(), 0);
if (when_done_ != nullptr) {
for (const std::unique_ptr<Constraint>& constraint : constraints_) { for (const std::unique_ptr<Constraint>& constraint : constraints_) {
if (constraint != nullptr) { if (constraint == nullptr) continue;
result.push_back(*constraint); result.push_back(*constraint);
} }
}
if (options_.log_matches()) { if (options_.log_matches()) {
LOG(INFO) << constraints_.size() << " computations resulted in " LOG(INFO) << constraints_.size() << " computations resulted in "
<< result.size() << " additional constraints."; << result.size() << " additional constraints.";
@ -282,26 +285,22 @@ void ConstraintBuilder2D::FinishComputation(const int computation_index) {
constraints_.clear(); constraints_.clear();
callback = std::move(when_done_); callback = std::move(when_done_);
when_done_.reset(); when_done_.reset();
}
}
kQueueLengthMetric->Set(constraints_.size()); kQueueLengthMetric->Set(constraints_.size());
} }
if (callback != nullptr) {
(*callback)(result); (*callback)(result);
} }
}
int ConstraintBuilder2D::GetNumFinishedNodes() { int ConstraintBuilder2D::GetNumFinishedNodes() {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
if (pending_computations_.empty()) { return num_finished_nodes_;
return current_computation_;
}
return pending_computations_.begin()->first;
} }
void ConstraintBuilder2D::DeleteScanMatcher(const SubmapId& submap_id) { void ConstraintBuilder2D::DeleteScanMatcher(const SubmapId& submap_id) {
common::MutexLocker locker(&mutex_); common::MutexLocker locker(&mutex_);
CHECK(pending_computations_.empty()); if (when_done_) {
LOG(WARNING)
<< "DeleteScanMatcher was called while WhenDone was scheduled.";
}
submap_scan_matchers_.erase(submap_id); submap_scan_matchers_.erase(submap_id);
} }

View File

@ -29,6 +29,7 @@
#include "cartographer/common/histogram.h" #include "cartographer/common/histogram.h"
#include "cartographer/common/math.h" #include "cartographer/common/math.h"
#include "cartographer/common/mutex.h" #include "cartographer/common/mutex.h"
#include "cartographer/common/task.h"
#include "cartographer/common/thread_pool.h" #include "cartographer/common/thread_pool.h"
#include "cartographer/mapping/2d/submap_2d.h" #include "cartographer/mapping/2d/submap_2d.h"
#include "cartographer/mapping/internal/2d/scan_matching/ceres_scan_matcher_2d.h" #include "cartographer/mapping/internal/2d/scan_matching/ceres_scan_matcher_2d.h"
@ -49,10 +50,10 @@ transform::Rigid2d ComputeSubmapPose(const Submap2D& submap);
// Asynchronously computes constraints. // Asynchronously computes constraints.
// //
// Intermingle an arbitrary number of calls to MaybeAddConstraint() or // Intermingle an arbitrary number of calls to 'MaybeAddConstraint',
// MaybeAddGlobalConstraint, then call WhenDone(). After all computations are // 'MaybeAddGlobalConstraint', and 'NotifyEndOfNode', then call 'WhenDone' once.
// done the 'callback' will be called with the result and another // After all computations are done the 'callback' will be called with the result
// MaybeAdd(Global)Constraint()/WhenDone() cycle can follow. // and another MaybeAdd(Global)Constraint()/WhenDone() cycle can follow.
// //
// This class is thread-safe. // This class is thread-safe.
class ConstraintBuilder2D { class ConstraintBuilder2D {
@ -92,7 +93,8 @@ class ConstraintBuilder2D {
void NotifyEndOfNode(); void NotifyEndOfNode();
// Registers the 'callback' to be called with the results, after all // Registers the 'callback' to be called with the results, after all
// computations triggered by MaybeAddConstraint() have finished. // computations triggered by 'MaybeAdd*Constraint' have finished.
// 'callback' is executed in the 'ThreadPool'.
void WhenDone(const std::function<void(const Result&)>& callback); void WhenDone(const std::function<void(const Result&)>& callback);
// Returns the number of consecutive finished nodes. // Returns the number of consecutive finished nodes.
@ -108,21 +110,13 @@ class ConstraintBuilder2D {
const Grid2D* grid; const Grid2D* grid;
std::unique_ptr<scan_matching::FastCorrelativeScanMatcher2D> std::unique_ptr<scan_matching::FastCorrelativeScanMatcher2D>
fast_correlative_scan_matcher; fast_correlative_scan_matcher;
std::weak_ptr<common::Task> creation_task_handle;
}; };
// Either schedules the 'work_item', or if needed, schedules the scan matcher // The returned 'grid' and 'fast_correlative_scan_matcher' must only be
// construction and queues the 'work_item'. // accessed after 'creation_task_handle' has completed.
void ScheduleSubmapScanMatcherConstructionAndQueueWorkItem( const SubmapScanMatcher* DispatchScanMatcherConstruction(
const SubmapId& submap_id, const Grid2D* grid, const SubmapId& submap_id, const Grid2D* grid) REQUIRES(mutex_);
const std::function<void()>& work_item) REQUIRES(mutex_);
// Constructs the scan matcher for a 'submap', then schedules its work items.
void ConstructSubmapScanMatcher(const SubmapId& submap_id, const Grid2D* grid)
EXCLUDES(mutex_);
// Returns the scan matcher for a submap, which has to exist.
const SubmapScanMatcher* GetSubmapScanMatcher(const SubmapId& submap_id)
EXCLUDES(mutex_);
// Runs in a background thread and does computations for an additional // Runs in a background thread and does computations for an additional
// constraint, assuming 'submap' and 'compressed_point_cloud' do not change // constraint, assuming 'submap' and 'compressed_point_cloud' do not change
@ -131,12 +125,11 @@ class ConstraintBuilder2D {
const NodeId& node_id, bool match_full_submap, const NodeId& node_id, bool match_full_submap,
const TrajectoryNode::Data* const constant_data, const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose, const transform::Rigid2d& initial_relative_pose,
const SubmapScanMatcher& submap_scan_matcher,
std::unique_ptr<Constraint>* constraint) std::unique_ptr<Constraint>* constraint)
EXCLUDES(mutex_); EXCLUDES(mutex_);
// Decrements the 'pending_computations_' count. If all computations are done, void RunWhenDoneCallback() EXCLUDES(mutex_);
// runs the 'when_done_' callback and resets the state.
void FinishComputation(int computation_index) EXCLUDES(mutex_);
const constraints::proto::ConstraintBuilderOptions options_; const constraints::proto::ConstraintBuilderOptions options_;
common::ThreadPoolInterface* thread_pool_; common::ThreadPoolInterface* thread_pool_;
@ -146,28 +139,27 @@ class ConstraintBuilder2D {
std::unique_ptr<std::function<void(const Result&)>> when_done_ std::unique_ptr<std::function<void(const Result&)>> when_done_
GUARDED_BY(mutex_); GUARDED_BY(mutex_);
// Index of the node in reaction to which computations are currently // TODO(gaschler): Use atomics instead of mutex to access these counters.
// added. This is always the highest node index seen so far, even when older // Number of the node in reaction to which computations are currently
// added. This is always the number of nodes seen so far, even when older
// nodes are matched against a new submap. // nodes are matched against a new submap.
int current_computation_ GUARDED_BY(mutex_) = 0; int num_started_nodes_ GUARDED_BY(mutex_) = 0;
// For each added node, maps to the number of pending computations that were int num_finished_nodes_ GUARDED_BY(mutex_) = 0;
// added for it.
std::map<int, int> pending_computations_ GUARDED_BY(mutex_); std::unique_ptr<common::Task> finish_node_task_ GUARDED_BY(mutex_);
std::unique_ptr<common::Task> when_done_task_ GUARDED_BY(mutex_);
// Constraints currently being computed in the background. A deque is used to // Constraints currently being computed in the background. A deque is used to
// keep pointers valid when adding more entries. // keep pointers valid when adding more entries. Constraint search results
// with below-threshold scores are also 'nullptr'.
std::deque<std::unique_ptr<Constraint>> constraints_ GUARDED_BY(mutex_); std::deque<std::unique_ptr<Constraint>> constraints_ GUARDED_BY(mutex_);
// Map of already constructed scan matchers by 'submap_id'. // Map of dispatched or constructed scan matchers by 'submap_id'.
std::map<SubmapId, SubmapScanMatcher> submap_scan_matchers_ std::map<SubmapId, SubmapScanMatcher> submap_scan_matchers_
GUARDED_BY(mutex_); GUARDED_BY(mutex_);
// Map by 'submap_id' of scan matchers under construction, and the work
// to do once construction is done.
std::map<SubmapId, std::vector<std::function<void()>>>
submap_queued_work_items_ GUARDED_BY(mutex_);
common::FixedRatioSampler sampler_; common::FixedRatioSampler sampler_;
scan_matching::CeresScanMatcher2D ceres_scan_matcher_; scan_matching::CeresScanMatcher2D ceres_scan_matcher_;