Implement receive_global_slam_optimizations_handler (#1169)

master
Christoph Schütte 2018-05-29 19:26:33 +02:00 committed by GitHub
parent a9c90da1a8
commit fbb2835525
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 272 additions and 31 deletions

View File

@ -21,6 +21,7 @@
#include "cartographer/cloud/internal/map_builder_server.h"
#include "cartographer/cloud/map_builder_server_options.h"
#include "cartographer/mapping/internal/testing/mock_map_builder.h"
#include "cartographer/mapping/internal/testing/mock_pose_graph.h"
#include "cartographer/mapping/internal/testing/mock_trajectory_builder.h"
#include "cartographer/mapping/internal/testing/test_helpers.h"
#include "cartographer/mapping/local_slam_result_data.h"
@ -33,6 +34,7 @@ using ::cartographer::mapping::MapBuilderInterface;
using ::cartographer::mapping::PoseGraphInterface;
using ::cartographer::mapping::TrajectoryBuilderInterface;
using ::cartographer::mapping::testing::MockMapBuilder;
using ::cartographer::mapping::testing::MockPoseGraph;
using ::cartographer::mapping::testing::MockTrajectoryBuilder;
using ::testing::_;
using SensorId = ::cartographer::mapping::TrajectoryBuilderInterface::SensorId;
@ -126,6 +128,14 @@ class ClientServerTest : public ::testing::Test {
void InitializeServerWithMockMapBuilder() {
auto mock_map_builder = common::make_unique<MockMapBuilder>();
mock_map_builder_ = mock_map_builder.get();
mock_pose_graph_ = common::make_unique<MockPoseGraph>();
EXPECT_CALL(
*mock_map_builder_,
pose_graph())
.WillOnce(::testing::Return(mock_pose_graph_.get()));
EXPECT_CALL(
*mock_pose_graph_,
SetGlobalSlamOptimizationCallback(_));
server_ = common::make_unique<MapBuilderServer>(
map_builder_server_options_, std::move(mock_map_builder));
EXPECT_TRUE(server_ != nullptr);
@ -160,6 +170,7 @@ class ClientServerTest : public ::testing::Test {
proto::MapBuilderServerOptions map_builder_server_options_;
proto::MapBuilderServerOptions uploading_map_builder_server_options_;
MockMapBuilder* mock_map_builder_;
std::unique_ptr<MockPoseGraph> mock_pose_graph_;
std::unique_ptr<MockTrajectoryBuilder> mock_trajectory_builder_;
::cartographer::mapping::proto::TrajectoryBuilderOptions
trajectory_builder_options_;

View File

@ -0,0 +1,81 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/handlers/receive_global_slam_optimizations_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"
#include "cartographer/transform/transform.h"
namespace cartographer {
namespace cloud {
namespace handlers {
namespace {
std::unique_ptr<proto::ReceiveGlobalSlamOptimizationsResponse> GenerateResponse(
const std::map<int, mapping::SubmapId> &last_optimized_submap_ids,
const std::map<int, mapping::NodeId> &last_optimized_node_ids) {
auto response =
common::make_unique<proto::ReceiveGlobalSlamOptimizationsResponse>();
for (const auto &entry : last_optimized_submap_ids) {
entry.second.ToProto(
&(*response->mutable_last_optimized_submap_ids())[entry.first]);
}
for (const auto &entry : last_optimized_node_ids) {
entry.second.ToProto(
&(*response->mutable_last_optimized_node_ids())[entry.first]);
}
return response;
}
} // namespace
void ReceiveGlobalSlamOptimizationsHandler::OnRequest(
const google::protobuf::Empty &request) {
auto writer = GetWriter();
const int subscription_index =
GetUnsynchronizedContext<MapBuilderContextInterface>()
->SubscribeGlobalSlamOptimizations(
[writer](const std::map<int, mapping::SubmapId>
&last_optimized_submap_ids,
const std::map<int, mapping::NodeId>
&last_optimized_node_ids) {
if (!writer.Write(GenerateResponse(last_optimized_submap_ids,
last_optimized_node_ids))) {
// Client closed connection.
LOG(INFO) << "Client closed connection.";
return false;
}
return true;
});
LOG(INFO) << "Added subscription: " << subscription_index;
subscription_index_ = subscription_index;
}
void ReceiveGlobalSlamOptimizationsHandler::OnFinish() {
if (subscription_index_.has_value()) {
LOG(INFO) << "Removing subscription " << subscription_index_.value();
GetUnsynchronizedContext<MapBuilderContextInterface>()
->UnsubscribeGlobalSlamOptimizations(subscription_index_.value());
}
}
} // namespace handlers
} // namespace cloud
} // namespace cartographer

View File

@ -0,0 +1,49 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_RECEIVE_GLOBAL_SLAM_OPTIMIZATIONS_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_RECEIVE_GLOBAL_SLAM_OPTIMIZATIONS_HANDLER_H
#include <memory>
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
ReceiveGlobalSlamOptimizationsSignature, google::protobuf::Empty,
async_grpc::Stream<proto::ReceiveGlobalSlamOptimizationsResponse>,
"/cartographer.cloud.proto.MapBuilderService/"
"ReceiveGlobalSlamOptimizations")
class ReceiveGlobalSlamOptimizationsHandler
: public async_grpc::RpcHandler<ReceiveGlobalSlamOptimizationsSignature> {
public:
void OnRequest(const google::protobuf::Empty &request) override;
void OnFinish() override;
private:
common::optional<int> subscription_index_;
};
} // namespace handlers
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_RECEIVE_GLOBAL_SLAM_OPTIMIZATIONS_HANDLER_H

View File

@ -51,7 +51,7 @@ std::unique_ptr<proto::ReceiveLocalSlamResultsResponse> GenerateResponse(
void ReceiveLocalSlamResultsHandler::OnRequest(
const proto::ReceiveLocalSlamResultsRequest& request) {
auto writer = GetWriter();
MapBuilderContextInterface::SubscriptionId subscription_id =
MapBuilderContextInterface::LocalSlamSubscriptionId subscription_id =
GetUnsynchronizedContext<MapBuilderContextInterface>()
->SubscribeLocalSlamResults(
request.trajectory_id(),
@ -59,16 +59,22 @@ void ReceiveLocalSlamResultsHandler::OnRequest(
std::unique_ptr<MapBuilderContextInterface::LocalSlamResult>
local_slam_result) {
if (local_slam_result) {
writer.Write(GenerateResponse(std::move(local_slam_result)));
if (!writer.Write(
GenerateResponse(std::move(local_slam_result)))) {
// Client closed connection.
LOG(INFO) << "Client closed connection.";
return false;
}
} else {
// Callback with 'nullptr' signals that the trajectory
// finished.
writer.WritesDone();
}
return true;
});
subscription_id_ =
common::make_unique<MapBuilderContextInterface::SubscriptionId>(
common::make_unique<MapBuilderContextInterface::LocalSlamSubscriptionId>(
subscription_id);
}

View File

@ -39,7 +39,8 @@ class ReceiveLocalSlamResultsHandler
void OnFinish() override;
private:
std::unique_ptr<MapBuilderContextInterface::SubscriptionId> subscription_id_;
std::unique_ptr<MapBuilderContextInterface::LocalSlamSubscriptionId>
subscription_id_;
};
} // namespace handlers

View File

@ -61,7 +61,7 @@ void MapBuilderContext<SubmapType>::AddSensorDataToTrajectory(
}
template <class SubmapType>
MapBuilderContextInterface::SubscriptionId
MapBuilderContextInterface::LocalSlamSubscriptionId
MapBuilderContext<SubmapType>::SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) {
return map_builder_server_->SubscribeLocalSlamResults(trajectory_id,
@ -70,10 +70,22 @@ MapBuilderContext<SubmapType>::SubscribeLocalSlamResults(
template <class SubmapType>
void MapBuilderContext<SubmapType>::UnsubscribeLocalSlamResults(
const SubscriptionId& subscription_id) {
const LocalSlamSubscriptionId& subscription_id) {
map_builder_server_->UnsubscribeLocalSlamResults(subscription_id);
}
template <class SubmapType>
int MapBuilderContext<SubmapType>::SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) {
return map_builder_server_->SubscribeGlobalSlamOptimizations(callback);
}
template <class SubmapType>
void MapBuilderContext<SubmapType>::UnsubscribeGlobalSlamOptimizations(
int subscription_index) {
map_builder_server_->UnsubscribeGlobalSlamOptimizations(subscription_index);
}
template <class SubmapType>
void MapBuilderContext<SubmapType>::NotifyFinishTrajectory(int trajectory_id) {
map_builder_server_->NotifyFinishTrajectory(trajectory_id);

View File

@ -21,6 +21,7 @@
#include "cartographer/cloud/internal/local_trajectory_uploader.h"
#include "cartographer/common/blocking_queue.h"
#include "cartographer/mapping/map_builder_interface.h"
#include "cartographer/mapping/pose_graph_interface.h"
#include "cartographer/mapping/proto/serialization.pb.h"
#include "cartographer/sensor/data.h"
#include "cartographer/sensor/range_data.h"
@ -40,14 +41,27 @@ class MapBuilderContextInterface : public async_grpc::ExecutionContext {
std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
insertion_result;
};
// Calling with 'nullptr' signals subscribers that the subscription has ended.
// Calling with 'nullptr' signals subscribers that the subscription has ended,
// e.g. this happens when the corresponding trajectory was finished and hence
// no more local SLAM updates will occur.
// The callback can return 'false' to indicate that the client is not
// interested in more local SLAM updates and 'MapBuilderServer' will end the
// subscription.
using LocalSlamSubscriptionCallback =
std::function<void(std::unique_ptr<LocalSlamResult>)>;
std::function<bool(std::unique_ptr<LocalSlamResult>)>;
// The callback can return 'false' to indicate that the client is not
// interested in more global SLAM runs and 'MapBuilderServer' will end the
// subscription.
using GlobalSlamOptimizationCallback = std::function<bool(
const std::map<int /* trajectory_id */, mapping::SubmapId>&,
const std::map<int /* trajectory_id */, mapping::NodeId>&)>;
struct Data {
int trajectory_id;
std::unique_ptr<sensor::Data> data;
};
struct SubscriptionId {
struct LocalSlamSubscriptionId {
const int trajectory_id;
const int subscription_index;
};
@ -64,10 +78,13 @@ class MapBuilderContextInterface : public async_grpc::ExecutionContext {
virtual mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
GetLocalSlamResultCallbackForSubscriptions() = 0;
virtual void AddSensorDataToTrajectory(const Data& sensor_data) = 0;
virtual SubscriptionId SubscribeLocalSlamResults(
virtual LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) = 0;
virtual void UnsubscribeLocalSlamResults(
const SubscriptionId& subscription_id) = 0;
const LocalSlamSubscriptionId& subscription_id) = 0;
virtual int SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) = 0;
virtual void UnsubscribeGlobalSlamOptimizations(int subscription_index) = 0;
virtual void NotifyFinishTrajectory(int trajectory_id) = 0;
virtual LocalTrajectoryUploaderInterface* local_trajectory_uploader() = 0;
virtual void EnqueueSensorData(int trajectory_id,

View File

@ -33,6 +33,7 @@
#include "cartographer/cloud/internal/handlers/is_trajectory_finished_handler.h"
#include "cartographer/cloud/internal/handlers/is_trajectory_frozen_handler.h"
#include "cartographer/cloud/internal/handlers/load_state_handler.h"
#include "cartographer/cloud/internal/handlers/receive_global_slam_optimizations_handler.h"
#include "cartographer/cloud/internal/handlers/receive_local_slam_results_handler.h"
#include "cartographer/cloud/internal/handlers/run_final_optimization_handler.h"
#include "cartographer/cloud/internal/handlers/set_landmark_pose_handler.h"
@ -73,6 +74,8 @@ MapBuilderServer::MapBuilderServer(
server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
server_builder
.RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
server_builder.RegisterHandler<handlers::GetSubmapHandler>();
server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
@ -99,6 +102,9 @@ MapBuilderServer::MapBuilderServer(
LOG(FATAL)
<< "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
}
map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
std::bind(&MapBuilderServer::OnGlobalSlamOptimizations, this,
std::placeholders::_1, std::placeholders::_2));
}
void MapBuilderServer::Start() {
@ -184,7 +190,7 @@ void MapBuilderServer::OnLocalSlamResult(
->EnqueueSensorData(std::move(sensor_data));
}
common::MutexLocker locker(&local_slam_subscriptions_lock_);
common::MutexLocker locker(&subscriptions_lock_);
for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
auto copy_of_insertion_result =
insertion_result
@ -194,34 +200,64 @@ void MapBuilderServer::OnLocalSlamResult(
: nullptr;
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
entry.second;
callback(common::make_unique<MapBuilderContextInterface::LocalSlamResult>(
MapBuilderContextInterface::LocalSlamResult{
trajectory_id, time, local_pose, shared_range_data,
std::move(copy_of_insertion_result)}));
if (!callback(
common::make_unique<MapBuilderContextInterface::LocalSlamResult>(
MapBuilderContextInterface::LocalSlamResult{
trajectory_id, time, local_pose, shared_range_data,
std::move(copy_of_insertion_result)}))) {
LOG(INFO) << "Removing subscription with index: " << entry.first;
CHECK_EQ(local_slam_subscriptions_[trajectory_id].erase(entry.first), 1u);
}
}
}
MapBuilderContextInterface::SubscriptionId
void MapBuilderServer::OnGlobalSlamOptimizations(
const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
common::MutexLocker locker(&subscriptions_lock_);
for (auto& entry : global_slam_subscriptions_) {
if (!entry.second(last_optimized_submap_ids, last_optimized_node_ids)) {
LOG(INFO) << "Removing subscription with index: " << entry.first;
CHECK_EQ(global_slam_subscriptions_.erase(entry.first), 1u);
}
}
}
MapBuilderContextInterface::LocalSlamSubscriptionId
MapBuilderServer::SubscribeLocalSlamResults(
int trajectory_id,
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback) {
common::MutexLocker locker(&local_slam_subscriptions_lock_);
common::MutexLocker locker(&subscriptions_lock_);
local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_,
callback);
return MapBuilderContextInterface::SubscriptionId{
return MapBuilderContextInterface::LocalSlamSubscriptionId{
trajectory_id, current_subscription_index_++};
}
void MapBuilderServer::UnsubscribeLocalSlamResults(
const MapBuilderContextInterface::SubscriptionId& subscription_id) {
common::MutexLocker locker(&local_slam_subscriptions_lock_);
const MapBuilderContextInterface::LocalSlamSubscriptionId&
subscription_id) {
common::MutexLocker locker(&subscriptions_lock_);
CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase(
subscription_id.subscription_index),
1u);
}
int MapBuilderServer::SubscribeGlobalSlamOptimizations(
MapBuilderContextInterface::GlobalSlamOptimizationCallback callback) {
common::MutexLocker locker(&subscriptions_lock_);
global_slam_subscriptions_.emplace(current_subscription_index_, callback);
return current_subscription_index_++;
}
void MapBuilderServer::UnsubscribeGlobalSlamOptimizations(
int subscription_index) {
common::MutexLocker locker(&subscriptions_lock_);
CHECK_EQ(global_slam_subscriptions_.erase(subscription_index), 1u);
}
void MapBuilderServer::NotifyFinishTrajectory(int trajectory_id) {
common::MutexLocker locker(&local_slam_subscriptions_lock_);
common::MutexLocker locker(&subscriptions_lock_);
for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback =
entry.second;

View File

@ -49,10 +49,13 @@ class MapBuilderContext : public MapBuilderContextInterface {
mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
GetLocalSlamResultCallbackForSubscriptions() override;
void AddSensorDataToTrajectory(const Data& sensor_data) override;
MapBuilderContextInterface::SubscriptionId SubscribeLocalSlamResults(
MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) override;
void UnsubscribeLocalSlamResults(
const SubscriptionId& subscription_id) override;
const LocalSlamSubscriptionId& subscription_id) override;
int SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) override;
void UnsubscribeGlobalSlamOptimizations(int subscription_index) override;
void NotifyFinishTrajectory(int trajectory_id) override;
LocalTrajectoryUploaderInterface* local_trajectory_uploader() override;
void EnqueueSensorData(int trajectory_id,
@ -107,11 +110,18 @@ class MapBuilderServer : public MapBuilderServerInterface {
std::unique_ptr<
const mapping::TrajectoryBuilderInterface::InsertionResult>
insertion_result);
MapBuilderContextInterface::SubscriptionId SubscribeLocalSlamResults(
void OnGlobalSlamOptimizations(
const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
const std::map<int, mapping::NodeId>& last_optimized_node_ids);
MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id,
MapBuilderContextInterface::LocalSlamSubscriptionCallback callback);
void UnsubscribeLocalSlamResults(
const MapBuilderContextInterface::SubscriptionId& subscription_id);
const MapBuilderContextInterface::LocalSlamSubscriptionId&
subscription_id);
int SubscribeGlobalSlamOptimizations(
MapBuilderContextInterface::GlobalSlamOptimizationCallback callback);
void UnsubscribeGlobalSlamOptimizations(int subscription_index);
void NotifyFinishTrajectory(int trajectory_id);
bool shutting_down_ = false;
@ -120,10 +130,13 @@ class MapBuilderServer : public MapBuilderServerInterface {
std::unique_ptr<mapping::MapBuilderInterface> map_builder_;
common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>
incoming_data_queue_;
common::Mutex local_slam_subscriptions_lock_;
common::Mutex subscriptions_lock_;
int current_subscription_index_ = 0;
std::map<int /* trajectory ID */, LocalSlamResultHandlerSubscriptions>
local_slam_subscriptions_ GUARDED_BY(local_slam_subscriptions_lock_);
local_slam_subscriptions_ GUARDED_BY(subscriptions_lock_);
std::map<int /* subscription_index */,
MapBuilderContextInterface::GlobalSlamOptimizationCallback>
global_slam_subscriptions_ GUARDED_BY(subscriptions_lock_);
std::unique_ptr<LocalTrajectoryUploaderInterface> local_trajectory_uploader_;
int starting_submap_index_ = 0;
};

View File

@ -39,11 +39,15 @@ class MockMapBuilderContext : public MapBuilderContextInterface {
MOCK_METHOD1(AddSensorDataToTrajectory,
void(const MapBuilderContextInterface::Data &));
MOCK_METHOD2(SubscribeLocalSlamResults,
MapBuilderContextInterface::SubscriptionId(
MapBuilderContextInterface::LocalSlamSubscriptionId(
int,
MapBuilderContextInterface::LocalSlamSubscriptionCallback));
MOCK_METHOD1(UnsubscribeLocalSlamResults,
void(const MapBuilderContextInterface::SubscriptionId &));
MOCK_METHOD1(
UnsubscribeLocalSlamResults,
void(const MapBuilderContextInterface::LocalSlamSubscriptionId &));
MOCK_METHOD1(SubscribeGlobalSlamOptimizations,
int(GlobalSlamOptimizationCallback));
MOCK_METHOD1(UnsubscribeGlobalSlamOptimizations, void(int));
MOCK_METHOD1(NotifyFinishTrajectory, void(int));
MOCK_METHOD0(local_trajectory_uploader, LocalTrajectoryUploaderInterface *());

View File

@ -114,6 +114,13 @@ message ReceiveLocalSlamResultsResponse {
LocalSlamInsertionResult insertion_result = 5;
}
message ReceiveGlobalSlamOptimizationsResponse {
map<int32 /* trajectory_id */, cartographer.mapping.proto.NodeId>
last_optimized_node_ids = 1;
map<int32 /* trajectory_id */, cartographer.mapping.proto.SubmapId>
last_optimized_submap_ids = 2;
}
message GetSubmapRequest {
cartographer.mapping.proto.SubmapId submap_id = 1;
}
@ -230,6 +237,10 @@ service MapBuilderService {
rpc ReceiveLocalSlamResults(ReceiveLocalSlamResultsRequest)
returns (stream ReceiveLocalSlamResultsResponse);
// Requests the server to send a stream of global SLAM notifications.
rpc ReceiveGlobalSlamOptimizations(google.protobuf.Empty)
returns (stream ReceiveGlobalSlamOptimizationsResponse);
// Marks a trajectory corresponding to 'trajectory_id' as finished,
// i.e. no further sensor data is expected.
rpc FinishTrajectory(FinishTrajectoryRequest) returns (google.protobuf.Empty);