From 35abfccc6b6d38cca3e55e6e34d825316ebfb473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Tue, 10 Jul 2018 20:41:00 +0200 Subject: [PATCH] Introduce client_id in cartographer grpc (#1241) --- cartographer/cloud/client/map_builder_stub.cc | 20 +++++++++++++++---- cartographer/cloud/client/map_builder_stub.h | 4 +++- .../cloud/internal/client/pose_graph_stub.cc | 14 ++++++++++--- .../cloud/internal/client/pose_graph_stub.h | 4 +++- .../cloud/internal/client_server_test.cc | 5 +++-- .../handlers/add_sensor_data_batch_handler.h | 2 +- .../handlers/add_trajectory_handler.cc | 4 +++- .../handlers/add_trajectory_handler_test.cc | 7 ++++++- .../handlers/delete_trajectory_handler.cc | 7 +++++++ .../handlers/finish_trajectory_handler.cc | 9 ++++++++- .../internal/local_trajectory_uploader.cc | 14 +++++++++---- .../internal/local_trajectory_uploader.h | 6 ++++-- .../cloud/internal/map_builder_context_impl.h | 16 +++++++++++++++ .../internal/map_builder_context_interface.h | 4 ++++ .../cloud/internal/map_builder_server.h | 5 +++++ .../testing/mock_local_trajectory_uploader.h | 6 +++--- .../testing/mock_map_builder_context.h | 2 ++ .../cloud/proto/map_builder_service.proto | 3 +++ 18 files changed, 108 insertions(+), 24 deletions(-) diff --git a/cartographer/cloud/client/map_builder_stub.cc b/cartographer/cloud/client/map_builder_stub.cc index c30c47e..88e7f9e 100644 --- a/cartographer/cloud/client/map_builder_stub.cc +++ b/cartographer/cloud/client/map_builder_stub.cc @@ -39,11 +39,14 @@ constexpr int kConnectionTimeoutInSeconds = 10; } // namespace -MapBuilderStub::MapBuilderStub(const std::string& server_address) +MapBuilderStub::MapBuilderStub(const std::string& server_address, + const std::string& client_id) : client_channel_(::grpc::CreateChannel( server_address, ::grpc::InsecureChannelCredentials())), - pose_graph_stub_(make_unique(client_channel_)) { - LOG(INFO) << "Connecting to SLAM process at " << server_address; + pose_graph_stub_(make_unique(client_channel_, client_id)), + client_id_(client_id) { + LOG(INFO) << "Connecting to SLAM process at " << server_address + << " with client_id " << client_id; std::chrono::system_clock::time_point deadline( std::chrono::system_clock::now() + std::chrono::seconds(kConnectionTimeoutInSeconds)); @@ -57,6 +60,7 @@ int MapBuilderStub::AddTrajectoryBuilder( const mapping::proto::TrajectoryBuilderOptions& trajectory_options, LocalSlamResultCallback local_slam_result_callback) { proto::AddTrajectoryRequest request; + request.set_client_id(client_id_); *request.mutable_trajectory_builder_options() = trajectory_options; for (const auto& sensor_id : expected_sensor_ids) { *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); @@ -93,10 +97,18 @@ mapping::TrajectoryBuilderInterface* MapBuilderStub::GetTrajectoryBuilder( void MapBuilderStub::FinishTrajectory(int trajectory_id) { proto::FinishTrajectoryRequest request; + request.set_client_id(client_id_); request.set_trajectory_id(trajectory_id); async_grpc::Client client( client_channel_); - CHECK(client.Write(request)); + ::grpc::Status status; + client.Write(request, &status); + if (!status.ok()) { + LOG(ERROR) << "Failed to finish trajectory " << trajectory_id + << " for client_id " << client_id_ << ": " + << status.error_message(); + return; + } trajectory_builder_stubs_.erase(trajectory_id); } diff --git a/cartographer/cloud/client/map_builder_stub.h b/cartographer/cloud/client/map_builder_stub.h index ba5a948..e63c59b 100644 --- a/cartographer/cloud/client/map_builder_stub.h +++ b/cartographer/cloud/client/map_builder_stub.h @@ -29,7 +29,8 @@ namespace cloud { class MapBuilderStub : public mapping::MapBuilderInterface { public: - MapBuilderStub(const std::string& server_address); + MapBuilderStub(const std::string& server_address, + const std::string& client_id); MapBuilderStub(const MapBuilderStub&) = delete; MapBuilderStub& operator=(const MapBuilderStub&) = delete; @@ -61,6 +62,7 @@ class MapBuilderStub : public mapping::MapBuilderInterface { std::unique_ptr pose_graph_stub_; std::map> trajectory_builder_stubs_; + const std::string client_id_; }; } // namespace cloud diff --git a/cartographer/cloud/internal/client/pose_graph_stub.cc b/cartographer/cloud/internal/client/pose_graph_stub.cc index c8bc622..8759550 100644 --- a/cartographer/cloud/internal/client/pose_graph_stub.cc +++ b/cartographer/cloud/internal/client/pose_graph_stub.cc @@ -35,8 +35,9 @@ namespace cartographer { namespace cloud { -PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel) - : client_channel_(client_channel) {} +PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel, + const std::string& client_id) + : client_channel_(client_channel), client_id_(client_id) {} void PoseGraphStub::RunFinalOptimization() { google::protobuf::Empty request; @@ -149,10 +150,17 @@ void PoseGraphStub::SetLandmarkPose(const std::string& landmark_id, void PoseGraphStub::DeleteTrajectory(int trajectory_id) { proto::DeleteTrajectoryRequest request; + request.set_client_id(client_id_); request.set_trajectory_id(trajectory_id); async_grpc::Client client( client_channel_); - CHECK(client.Write(request)); + ::grpc::Status status; + client.Write(request, &status); + if (!status.ok()) { + LOG(ERROR) << "Failed to delete trajectory " << trajectory_id + << " for client_id " << client_id_ << ": " + << status.error_message(); + } } bool PoseGraphStub::IsTrajectoryFinished(int trajectory_id) const { diff --git a/cartographer/cloud/internal/client/pose_graph_stub.h b/cartographer/cloud/internal/client/pose_graph_stub.h index e46e92d..625cbed 100644 --- a/cartographer/cloud/internal/client/pose_graph_stub.h +++ b/cartographer/cloud/internal/client/pose_graph_stub.h @@ -25,7 +25,8 @@ namespace cloud { class PoseGraphStub : public ::cartographer::mapping::PoseGraphInterface { public: - PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel); + PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel, + const std::string& client_id); PoseGraphStub(const PoseGraphStub&) = delete; PoseGraphStub& operator=(const PoseGraphStub&) = delete; @@ -57,6 +58,7 @@ class PoseGraphStub : public ::cartographer::mapping::PoseGraphInterface { private: std::shared_ptr<::grpc::Channel> client_channel_; + const std::string client_id_; }; } // namespace cloud diff --git a/cartographer/cloud/internal/client_server_test.cc b/cartographer/cloud/internal/client_server_test.cc index 1b43183..4caaa78 100644 --- a/cartographer/cloud/internal/client_server_test.cc +++ b/cartographer/cloud/internal/client_server_test.cc @@ -45,6 +45,7 @@ namespace cartographer { namespace cloud { namespace { +constexpr char kClientId[] = "CLIENT_ID"; const SensorId kImuSensorId{SensorId::SensorType::IMU, "imu"}; const SensorId kRangeSensorId{SensorId::SensorType::RANGE, "range"}; constexpr double kDuration = 4.; // Seconds. @@ -166,13 +167,13 @@ class ClientServerTest : public ::testing::Test { void InitializeStub() { stub_ = common::make_unique( - map_builder_server_options_.server_address()); + map_builder_server_options_.server_address(), kClientId); EXPECT_TRUE(stub_ != nullptr); } void InitializeStubForUploadingServer() { stub_for_uploading_server_ = common::make_unique( - uploading_map_builder_server_options_.server_address()); + uploading_map_builder_server_options_.server_address(), kClientId); EXPECT_TRUE(stub_for_uploading_server_ != nullptr); } diff --git a/cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h b/cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h index 679febf..4848441 100644 --- a/cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h +++ b/cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h @@ -40,4 +40,4 @@ class AddSensorDataBatchHandler } // namespace cloud } // namespace cartographer -#endif // CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_SENSOR_DATA_BATCH_HANDLER_H \ No newline at end of file +#endif // CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_SENSOR_DATA_BATCH_HANDLER_H diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc index 8133c91..a496c35 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc @@ -41,6 +41,8 @@ void AddTrajectoryHandler::OnRequest( .AddTrajectoryBuilder(expected_sensor_ids, request.trajectory_builder_options(), local_slam_result_callback); + GetContext()->RegisterClientIdForTrajectory( + request.client_id(), trajectory_id); if (GetUnsynchronizedContext() ->local_trajectory_uploader()) { auto trajectory_builder_options = request.trajectory_builder_options(); @@ -60,7 +62,7 @@ void AddTrajectoryHandler::OnRequest( GetContext() ->local_trajectory_uploader() - ->AddTrajectory(trajectory_id, expected_sensor_ids, + ->AddTrajectory(request.client_id(), trajectory_id, expected_sensor_ids, trajectory_builder_options); } diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc index c4f2339..efc5047 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc @@ -36,6 +36,7 @@ using ::testing::Test; using ::testing::Truly; const std::string kMessage = R"( + client_id: "CLIENT_ID" expected_sensor_ids { id: "range_sensor" type: RANGE @@ -105,6 +106,8 @@ TEST_F(AddTrajectoryHandlerTest, NoLocalSlamUploader) { &request.trajectory_builder_options())), _)) .WillOnce(Return(13)); + EXPECT_CALL(*mock_map_builder_context_, + RegisterClientIdForTrajectory(Eq("CLIENT_ID"), Eq(13))); test_server_->SendWrite(request); EXPECT_EQ(test_server_->response().trajectory_id(), 13); } @@ -120,6 +123,8 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) { &request.trajectory_builder_options())), _)) .WillOnce(Return(13)); + EXPECT_CALL(*mock_map_builder_context_, + RegisterClientIdForTrajectory(Eq("CLIENT_ID"), Eq(13))); auto upstream_trajectory_builder_options = request.trajectory_builder_options(); // Reproduce the changes to the request as done by the handler. @@ -128,7 +133,7 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) { upstream_trajectory_builder_options.clear_pure_localization_trimmer(); upstream_trajectory_builder_options.clear_initial_trajectory_pose(); EXPECT_CALL(*mock_local_trajectory_uploader_, - AddTrajectory(Eq(13), ParseSensorIds(request), + AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request), Truly(testing::BuildProtoPredicateEquals( &upstream_trajectory_builder_options)))); test_server_->SendWrite(request); diff --git a/cartographer/cloud/internal/handlers/delete_trajectory_handler.cc b/cartographer/cloud/internal/handlers/delete_trajectory_handler.cc index 110daea..fb6ff51 100644 --- a/cartographer/cloud/internal/handlers/delete_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/delete_trajectory_handler.cc @@ -28,6 +28,13 @@ namespace handlers { void DeleteTrajectoryHandler::OnRequest( const proto::DeleteTrajectoryRequest& request) { + if (!GetContext()->CheckClientIdForTrajectory( + request.client_id(), request.trajectory_id())) { + LOG(ERROR) << "Unknown trajectory with ID " << request.trajectory_id() + << " and client_id " << request.client_id(); + Finish(::grpc::Status(::grpc::NOT_FOUND, "Unknown trajectory")); + return; + } GetContext() ->map_builder() .pose_graph() diff --git a/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc b/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc index 011ebac..fab7cb7 100644 --- a/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc @@ -28,6 +28,13 @@ namespace handlers { void FinishTrajectoryHandler::OnRequest( const proto::FinishTrajectoryRequest& request) { + if (!GetContext()->CheckClientIdForTrajectory( + request.client_id(), request.trajectory_id())) { + LOG(ERROR) << "Unknown trajectory with ID " << request.trajectory_id() + << " and client_id " << request.client_id(); + Finish(::grpc::Status(::grpc::NOT_FOUND, "Unknown trajectory")); + return; + } GetContext()->map_builder().FinishTrajectory( request.trajectory_id()); GetUnsynchronizedContext() @@ -36,7 +43,7 @@ void FinishTrajectoryHandler::OnRequest( ->local_trajectory_uploader()) { GetContext() ->local_trajectory_uploader() - ->FinishTrajectory(request.trajectory_id()); + ->FinishTrajectory(request.client_id(), request.trajectory_id()); } Send(common::make_unique()); } diff --git a/cartographer/cloud/internal/local_trajectory_uploader.cc b/cartographer/cloud/internal/local_trajectory_uploader.cc index c50588b..26a010c 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.cc +++ b/cartographer/cloud/internal/local_trajectory_uploader.cc @@ -55,9 +55,11 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { void Shutdown() final; void AddTrajectory( - int local_trajectory_id, const std::set& expected_sensor_ids, + const std::string& client_id, int local_trajectory_id, + const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final; - void FinishTrajectory(int local_trajectory_id) final; + void FinishTrajectory(const std::string& client_id, + int local_trajectory_id) final; void EnqueueSensorData(std::unique_ptr sensor_data) final; SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final { @@ -158,9 +160,11 @@ void LocalTrajectoryUploader::TranslateTrajectoryId( } void LocalTrajectoryUploader::AddTrajectory( - int local_trajectory_id, const std::set& expected_sensor_ids, + const std::string& client_id, int local_trajectory_id, + const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { proto::AddTrajectoryRequest request; + request.set_client_id(client_id); *request.mutable_trajectory_builder_options() = trajectory_options; for (const SensorId& sensor_id : expected_sensor_ids) { // Range sensors are not forwarded, but combined into a LocalSlamResult. @@ -178,11 +182,13 @@ void LocalTrajectoryUploader::AddTrajectory( client.response().trajectory_id(); } -void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) { +void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id, + int local_trajectory_id) { CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1); int cloud_trajectory_id = local_to_cloud_trajectory_id_map_[local_trajectory_id]; proto::FinishTrajectoryRequest request; + request.set_client_id(client_id); request.set_trajectory_id(cloud_trajectory_id); async_grpc::Client client( client_channel_); diff --git a/cartographer/cloud/internal/local_trajectory_uploader.h b/cartographer/cloud/internal/local_trajectory_uploader.h index f7f4a07..4c4a34c 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.h +++ b/cartographer/cloud/internal/local_trajectory_uploader.h @@ -45,9 +45,11 @@ class LocalTrajectoryUploaderInterface { virtual void EnqueueSensorData( std::unique_ptr sensor_data) = 0; virtual void AddTrajectory( - int local_trajectory_id, const std::set& expected_sensor_ids, + const std::string& client_id, int local_trajectory_id, + const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0; - virtual void FinishTrajectory(int local_trajectory_id) = 0; + virtual void FinishTrajectory(const std::string& client_id, + int local_trajectory_id) = 0; virtual SensorId GetLocalSlamResultSensorId( int local_trajectory_id) const = 0; diff --git a/cartographer/cloud/internal/map_builder_context_impl.h b/cartographer/cloud/internal/map_builder_context_impl.h index 18883a5..983ded4 100644 --- a/cartographer/cloud/internal/map_builder_context_impl.h +++ b/cartographer/cloud/internal/map_builder_context_impl.h @@ -104,6 +104,22 @@ void MapBuilderContext::EnqueueSensorData( common::make_unique(Data{trajectory_id, std::move(data)})); } +template +void MapBuilderContext::RegisterClientIdForTrajectory( + const std::string& client_id, int trajectory_id) { + CHECK_EQ(client_ids_.count(trajectory_id), 0u); + LOG(INFO) << "Registering trajectory_id " << trajectory_id << " to client_id " + << client_id; + client_ids_[trajectory_id] = client_id; +} + +template +bool MapBuilderContext::CheckClientIdForTrajectory( + const std::string& client_id, int trajectory_id) { + return (client_ids_.count(trajectory_id) > 0 && + client_ids_[trajectory_id] == client_id); +} + template <> void MapBuilderContext::EnqueueLocalSlamResultData( int trajectory_id, const std::string& sensor_id, diff --git a/cartographer/cloud/internal/map_builder_context_interface.h b/cartographer/cloud/internal/map_builder_context_interface.h index 6a87a14..ee6fe2d 100644 --- a/cartographer/cloud/internal/map_builder_context_interface.h +++ b/cartographer/cloud/internal/map_builder_context_interface.h @@ -92,6 +92,10 @@ class MapBuilderContextInterface : public async_grpc::ExecutionContext { virtual void EnqueueLocalSlamResultData( int trajectory_id, const std::string& sensor_id, const mapping::proto::LocalSlamResultData& local_slam_result_data) = 0; + virtual void RegisterClientIdForTrajectory(const std::string& client_id, + int trajectory_id) = 0; + virtual bool CheckClientIdForTrajectory(const std::string& client_id, + int trajectory_id) = 0; }; } // namespace cloud diff --git a/cartographer/cloud/internal/map_builder_server.h b/cartographer/cloud/internal/map_builder_server.h index 761d9c5..af014fb 100644 --- a/cartographer/cloud/internal/map_builder_server.h +++ b/cartographer/cloud/internal/map_builder_server.h @@ -64,10 +64,15 @@ class MapBuilderContext : public MapBuilderContextInterface { const std::string& sensor_id, const mapping::proto::LocalSlamResultData& local_slam_result_data) override; + void RegisterClientIdForTrajectory(const std::string& client_id, + int trajectory_id) override; + bool CheckClientIdForTrajectory(const std::string& client_id, + int trajectory_id) override; private: MapBuilderServer* map_builder_server_; mapping::SubmapController submap_controller_; + std::map client_ids_; }; class MapBuilderServer : public MapBuilderServerInterface { diff --git a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h index c15bd5f..7754d8b 100644 --- a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h +++ b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h @@ -35,10 +35,10 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { } MOCK_METHOD0(Start, void()); MOCK_METHOD0(Shutdown, void()); - MOCK_METHOD3(AddTrajectory, - void(int, const std::set &, + MOCK_METHOD4(AddTrajectory, + void(const std::string &, int, const std::set &, const mapping::proto::TrajectoryBuilderOptions &)); - MOCK_METHOD1(FinishTrajectory, void(int)); + MOCK_METHOD2(FinishTrajectory, void(const std::string &, int)); MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int)); }; diff --git a/cartographer/cloud/internal/testing/mock_map_builder_context.h b/cartographer/cloud/internal/testing/mock_map_builder_context.h index f9ba0e8..9ec8788 100644 --- a/cartographer/cloud/internal/testing/mock_map_builder_context.h +++ b/cartographer/cloud/internal/testing/mock_map_builder_context.h @@ -59,6 +59,8 @@ class MockMapBuilderContext : public MapBuilderContextInterface { MOCK_METHOD3(EnqueueLocalSlamResultData, void(int, const std::string &, const mapping::proto::LocalSlamResultData &)); + MOCK_METHOD2(RegisterClientIdForTrajectory, void(const std::string &, int)); + MOCK_METHOD2(CheckClientIdForTrajectory, bool(const std::string &, int)); }; } // namespace testing diff --git a/cartographer/cloud/proto/map_builder_service.proto b/cartographer/cloud/proto/map_builder_service.proto index ac68114..ac764ff 100644 --- a/cartographer/cloud/proto/map_builder_service.proto +++ b/cartographer/cloud/proto/map_builder_service.proto @@ -49,6 +49,7 @@ message AddTrajectoryRequest { repeated SensorId expected_sensor_ids = 3; cartographer.mapping.proto.TrajectoryBuilderOptions trajectory_builder_options = 2; + string client_id = 4; } message SensorMetadata { @@ -103,10 +104,12 @@ message AddLandmarkDataRequest { message FinishTrajectoryRequest { int32 trajectory_id = 1; + string client_id = 2; } message DeleteTrajectoryRequest { int32 trajectory_id = 1; + string client_id = 2; } message ReceiveLocalSlamResultsRequest {