diff --git a/cartographer_grpc/framework/client_writer.h b/cartographer_grpc/framework/client_writer.h deleted file mode 100644 index bc4a10c..0000000 --- a/cartographer_grpc/framework/client_writer.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ -#define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ - -#include - -#include "google/protobuf/empty.pb.h" -#include "grpc++/grpc++.h" - -namespace cartographer_grpc { -namespace framework { - -template -struct ClientWriter { - grpc::ClientContext client_context; - std::unique_ptr> client_writer; - google::protobuf::Empty response; -}; - -} // namespace framework -} // namespace cartographer_grpc - -#endif // CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ diff --git a/cartographer_grpc/framework/server_test.cc b/cartographer_grpc/framework/server_test.cc index 5a97b59..b5cd0a4 100644 --- a/cartographer_grpc/framework/server_test.cc +++ b/cartographer_grpc/framework/server_test.cc @@ -20,7 +20,6 @@ #include "cartographer_grpc/framework/client.h" #include "cartographer_grpc/framework/execution_context.h" -#include "cartographer_grpc/framework/proto/math_service.grpc.pb.h" #include "cartographer_grpc/framework/proto/math_service.pb.h" #include "cartographer_grpc/framework/rpc_handler.h" #include "glog/logging.h" diff --git a/cartographer_grpc/local_trajectory_uploader.cc b/cartographer_grpc/local_trajectory_uploader.cc index 0aca68e..af596a0 100644 --- a/cartographer_grpc/local_trajectory_uploader.cc +++ b/cartographer_grpc/local_trajectory_uploader.cc @@ -15,8 +15,9 @@ */ #include "cartographer_grpc/local_trajectory_uploader.h" - #include "cartographer/common/make_unique.h" +#include "cartographer_grpc/handlers/add_trajectory_handler.h" +#include "cartographer_grpc/handlers/finish_trajectory_handler.h" #include "cartographer_grpc/proto/map_builder_service.pb.h" #include "cartographer_grpc/sensor/serialization.h" #include "glog/logging.h" @@ -31,22 +32,29 @@ const cartographer::common::Duration kPopTimeout = LocalTrajectoryUploader::LocalTrajectoryUploader( const std::string &uplink_server_address) - : client_channel_(grpc::CreateChannel(uplink_server_address, - grpc::InsecureChannelCredentials())), - service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {} + : client_channel_(grpc::CreateChannel( + uplink_server_address, grpc::InsecureChannelCredentials())) {} LocalTrajectoryUploader::~LocalTrajectoryUploader() { - if (imu_writer_.client_writer) { - CHECK(imu_writer_.client_writer->WritesDone()); - CHECK(imu_writer_.client_writer->Finish().ok()); + if (add_imu_client_) { + CHECK(add_imu_client_->WritesDone()); + CHECK(add_imu_client_->Finish().ok()); } - if (odometry_writer_.client_writer) { - CHECK(odometry_writer_.client_writer->WritesDone()); - CHECK(odometry_writer_.client_writer->Finish().ok()); + if (add_odometry_client_) { + CHECK(add_odometry_client_->WritesDone()); + CHECK(add_odometry_client_->Finish().ok()); } - if (fixed_frame_pose_writer_.client_writer) { - CHECK(fixed_frame_pose_writer_.client_writer->WritesDone()); - CHECK(fixed_frame_pose_writer_.client_writer->Finish().ok()); + if (add_fixed_frame_pose_client_) { + CHECK(add_fixed_frame_pose_client_->WritesDone()); + CHECK(add_fixed_frame_pose_client_->Finish().ok()); + } + if (add_local_slam_result_client_) { + CHECK(add_local_slam_result_client_->WritesDone()); + CHECK(add_local_slam_result_client_->Finish().ok()); + } + if (add_landmark_client_) { + CHECK(add_landmark_client_->WritesDone()); + CHECK(add_landmark_client_->Finish().ok()); } } @@ -84,6 +92,10 @@ void LocalTrajectoryUploader::ProcessSendQueue() { dynamic_cast( data_message.get())) { ProcessLocalSlamResultDataMessage(local_slam_result_data); + } else if (auto *landmark_data = + dynamic_cast( + data_message.get())) { + ProcessLandmarkDataMessage(landmark_data); } else { LOG(FATAL) << "Unknown message type: " << data_message->GetTypeName(); } @@ -100,47 +112,51 @@ void LocalTrajectoryUploader::TranslateTrajectoryId( void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage( proto::AddFixedFramePoseDataRequest *data_request) { - if (!fixed_frame_pose_writer_.client_writer) { - fixed_frame_pose_writer_.client_writer = - service_stub_->AddFixedFramePoseData( - &fixed_frame_pose_writer_.client_context, - &fixed_frame_pose_writer_.response); - CHECK(fixed_frame_pose_writer_.client_writer); + if (!add_fixed_frame_pose_client_) { + add_fixed_frame_pose_client_ = cartographer::common::make_unique< + framework::Client>( + client_channel_); } TranslateTrajectoryId(data_request->mutable_sensor_metadata()); - fixed_frame_pose_writer_.client_writer->Write(*data_request); + CHECK(add_fixed_frame_pose_client_->Write(*data_request)); } void LocalTrajectoryUploader::ProcessImuDataMessage( proto::AddImuDataRequest *data_request) { - if (!imu_writer_.client_writer) { - imu_writer_.client_writer = service_stub_->AddImuData( - &imu_writer_.client_context, &imu_writer_.response); - CHECK(imu_writer_.client_writer); + if (!add_imu_client_) { + add_imu_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); } TranslateTrajectoryId(data_request->mutable_sensor_metadata()); - imu_writer_.client_writer->Write(*data_request); + CHECK(add_imu_client_->Write(*data_request)); } void LocalTrajectoryUploader::ProcessOdometryDataMessage( proto::AddOdometryDataRequest *data_request) { - if (!odometry_writer_.client_writer) { - odometry_writer_.client_writer = service_stub_->AddOdometryData( - &odometry_writer_.client_context, &odometry_writer_.response); - CHECK(odometry_writer_.client_writer); + if (!add_odometry_client_) { + add_odometry_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); } TranslateTrajectoryId(data_request->mutable_sensor_metadata()); - odometry_writer_.client_writer->Write(*data_request); + CHECK(add_odometry_client_->Write(*data_request)); +} + +void LocalTrajectoryUploader::ProcessLandmarkDataMessage( + proto::AddLandmarkDataRequest *data_request) { + if (!add_landmark_client_) { + add_landmark_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); + } + TranslateTrajectoryId(data_request->mutable_sensor_metadata()); + CHECK(add_landmark_client_->Write(*data_request)); } void LocalTrajectoryUploader::ProcessLocalSlamResultDataMessage( proto::AddLocalSlamResultDataRequest *data_request) { - if (!local_slam_result_writer_.client_writer) { - local_slam_result_writer_.client_writer = - service_stub_->AddLocalSlamResultData( - &local_slam_result_writer_.client_context, - &local_slam_result_writer_.response); - CHECK(local_slam_result_writer_.client_writer); + if (!add_local_slam_result_client_) { + add_local_slam_result_client_ = cartographer::common::make_unique< + framework::Client>( + client_channel_); } TranslateTrajectoryId(data_request->mutable_sensor_metadata()); // A submap also holds a trajectory id that must be translated to uplink's @@ -150,16 +166,14 @@ void LocalTrajectoryUploader::ProcessLocalSlamResultDataMessage( mutable_submap.mutable_submap_id()->set_trajectory_id( data_request->sensor_metadata().trajectory_id()); } - local_slam_result_writer_.client_writer->Write(*data_request); + CHECK(add_local_slam_result_client_->Write(*data_request)); } void LocalTrajectoryUploader::AddTrajectory( int local_trajectory_id, const std::set &expected_sensor_ids, const cartographer::mapping::proto::TrajectoryBuilderOptions &trajectory_options) { - grpc::ClientContext client_context; proto::AddTrajectoryRequest request; - proto::AddTrajectoryResponse result; *request.mutable_trajectory_builder_options() = trajectory_options; for (const SensorId &sensor_id : expected_sensor_ids) { // Range sensors are not forwarded, but combined into a LocalSlamResult. @@ -169,25 +183,21 @@ void LocalTrajectoryUploader::AddTrajectory( } *request.add_expected_sensor_ids() = sensor::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); - grpc::Status status = - service_stub_->AddTrajectory(&client_context, request, &result); - CHECK(status.ok()); + framework::Client client(client_channel_); + CHECK(client.Write(request)); CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0); local_to_cloud_trajectory_id_map_[local_trajectory_id] = - result.trajectory_id(); + client.response().trajectory_id(); } void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) { CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1); int cloud_trajectory_id = local_to_cloud_trajectory_id_map_[local_trajectory_id]; - grpc::ClientContext client_context; proto::FinishTrajectoryRequest request; - google::protobuf::Empty response; request.set_trajectory_id(cloud_trajectory_id); - grpc::Status status = - service_stub_->FinishTrajectory(&client_context, request, &response); - CHECK(status.ok()); + framework::Client client(client_channel_); + CHECK(client.Write(request)); } void LocalTrajectoryUploader::EnqueueDataRequest( diff --git a/cartographer_grpc/local_trajectory_uploader.h b/cartographer_grpc/local_trajectory_uploader.h index 1eb3cd5..6ca4763 100644 --- a/cartographer_grpc/local_trajectory_uploader.h +++ b/cartographer_grpc/local_trajectory_uploader.h @@ -26,8 +26,13 @@ #include "cartographer/common/blocking_queue.h" #include "cartographer/mapping/proto/trajectory_builder_options.pb.h" #include "cartographer/mapping/trajectory_builder_interface.h" -#include "cartographer_grpc/framework/client_writer.h" -#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" +#include "cartographer_grpc/framework/client.h" +#include "cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h" +#include "cartographer_grpc/handlers/add_imu_data_handler.h" +#include "cartographer_grpc/handlers/add_landmark_data_handler.h" +#include "cartographer_grpc/handlers/add_local_slam_result_data_handler.h" +#include "cartographer_grpc/handlers/add_odometry_data_handler.h" +#include "cartographer_grpc/proto/map_builder_service.pb.h" #include "grpc++/grpc++.h" namespace cartographer_grpc { @@ -85,21 +90,25 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { void ProcessLocalSlamResultDataMessage( proto::AddLocalSlamResultDataRequest* data_request); void ProcessOdometryDataMessage(proto::AddOdometryDataRequest* data_request); + void ProcessLandmarkDataMessage(proto::AddLandmarkDataRequest* data_request); std::shared_ptr client_channel_; - std::unique_ptr service_stub_; std::map local_to_cloud_trajectory_id_map_; cartographer::common::BlockingQueue< std::unique_ptr> send_queue_; bool shutting_down_ = false; std::unique_ptr upload_thread_; - framework::ClientWriter - fixed_frame_pose_writer_; - framework::ClientWriter imu_writer_; - framework::ClientWriter - local_slam_result_writer_; - framework::ClientWriter odometry_writer_; + std::unique_ptr> + add_fixed_frame_pose_client_; + std::unique_ptr> + add_imu_client_; + std::unique_ptr> + add_local_slam_result_client_; + std::unique_ptr> + add_odometry_client_; + std::unique_ptr> + add_landmark_client_; }; } // namespace cartographer_grpc diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index 9f53ad5..449d873 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -33,7 +33,6 @@ #include "cartographer_grpc/handlers/receive_local_slam_results_handler.h" #include "cartographer_grpc/handlers/run_final_optimization_handler.h" #include "cartographer_grpc/handlers/write_map_handler.h" -#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "cartographer_grpc/sensor/serialization.h" #include "glog/logging.h" diff --git a/cartographer_grpc/mapping/map_builder_stub.cc b/cartographer_grpc/mapping/map_builder_stub.cc index b498cc8..52af0ef 100644 --- a/cartographer_grpc/mapping/map_builder_stub.cc +++ b/cartographer_grpc/mapping/map_builder_stub.cc @@ -15,7 +15,11 @@ */ #include "cartographer_grpc/mapping/map_builder_stub.h" - +#include "cartographer_grpc/handlers/add_trajectory_handler.h" +#include "cartographer_grpc/handlers/finish_trajectory_handler.h" +#include "cartographer_grpc/handlers/get_submap_handler.h" +#include "cartographer_grpc/handlers/load_map_handler.h" +#include "cartographer_grpc/handlers/write_map_handler.h" #include "cartographer_grpc/proto/map_builder_service.pb.h" #include "cartographer_grpc/sensor/serialization.h" #include "glog/logging.h" @@ -26,33 +30,30 @@ namespace mapping { MapBuilderStub::MapBuilderStub(const std::string& server_address) : client_channel_(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())), - service_stub_(proto::MapBuilderService::NewStub(client_channel_)), - pose_graph_stub_(client_channel_, service_stub_.get()) {} + pose_graph_stub_(client_channel_) {} int MapBuilderStub::AddTrajectoryBuilder( const std::set& expected_sensor_ids, const cartographer::mapping::proto::TrajectoryBuilderOptions& trajectory_options, LocalSlamResultCallback local_slam_result_callback) { - grpc::ClientContext client_context; proto::AddTrajectoryRequest request; - proto::AddTrajectoryResponse result; *request.mutable_trajectory_builder_options() = trajectory_options; for (const auto& sensor_id : expected_sensor_ids) { *request.add_expected_sensor_ids() = sensor::ToProto(sensor_id); } - grpc::Status status = - service_stub_->AddTrajectory(&client_context, request, &result); - CHECK(status.ok()); + framework::Client client(client_channel_); + CHECK(client.Write(request)); // Construct trajectory builder stub. trajectory_builder_stubs_.emplace( - std::piecewise_construct, std::forward_as_tuple(result.trajectory_id()), + std::piecewise_construct, + std::forward_as_tuple(client.response().trajectory_id()), std::forward_as_tuple(cartographer::common::make_unique< cartographer_grpc::mapping::TrajectoryBuilderStub>( - client_channel_, result.trajectory_id(), + client_channel_, client.response().trajectory_id(), local_slam_result_callback))); - return result.trajectory_id(); + return client.response().trajectory_id(); } int MapBuilderStub::AddTrajectoryForDeserialization() { @@ -65,13 +66,10 @@ MapBuilderStub::GetTrajectoryBuilder(int trajectory_id) const { } void MapBuilderStub::FinishTrajectory(int trajectory_id) { - grpc::ClientContext client_context; proto::FinishTrajectoryRequest request; - google::protobuf::Empty response; request.set_trajectory_id(trajectory_id); - grpc::Status status = - service_stub_->FinishTrajectory(&client_context, request, &response); - CHECK(status.ok()); + framework::Client client(client_channel_); + CHECK(client.Write(request)); trajectory_builder_stubs_.erase(trajectory_id); } @@ -79,22 +77,21 @@ std::string MapBuilderStub::SubmapToProto( const cartographer::mapping::SubmapId& submap_id, cartographer::mapping::proto::SubmapQuery::Response* submap_query_response) { - grpc::ClientContext client_context; proto::GetSubmapRequest request; submap_id.ToProto(request.mutable_submap_id()); - proto::GetSubmapResponse response; - CHECK(service_stub_->GetSubmap(&client_context, request, &response).ok()); - submap_query_response->CopyFrom(response.submap_query_response()); - return response.error_msg(); + framework::Client client(client_channel_); + CHECK(client.Write(request)); + submap_query_response->CopyFrom(client.response().submap_query_response()); + return client.response().error_msg(); } void MapBuilderStub::SerializeState( cartographer::io::ProtoStreamWriterInterface* writer) { - grpc::ClientContext client_context; google::protobuf::Empty request; - auto reader = service_stub_->WriteMap(&client_context, request); + framework::Client client(client_channel_); + CHECK(client.Write(request)); proto::WriteMapResponse response; - while (reader->Read(&response)) { + while (client.Read(&response)) { writer->WriteProto(response); } CHECK(writer->Close()); @@ -102,26 +99,22 @@ void MapBuilderStub::SerializeState( void MapBuilderStub::LoadMap( cartographer::io::ProtoStreamReaderInterface* reader) { - framework::ClientWriter load_map_writer; - load_map_writer.client_writer = service_stub_->LoadMap( - &load_map_writer.client_context, &load_map_writer.response); - CHECK(load_map_writer.client_writer); - + framework::Client client(client_channel_); // Request with a PoseGraph proto is sent first. { proto::LoadMapRequest request; CHECK(reader->ReadProto(request.mutable_pose_graph())); - CHECK(load_map_writer.client_writer->Write(request)); + CHECK(client.Write(request)); } - // Multiple requests with SerializedData are sent after. proto::LoadMapRequest request; while (reader->ReadProto(request.mutable_serialized_data())) { - CHECK(load_map_writer.client_writer->Write(request)); + CHECK(client.Write(request)); } CHECK(reader->eof()); - CHECK(load_map_writer.client_writer->WritesDone()); + CHECK(client.WritesDone()); + CHECK(client.Finish().ok()); } int MapBuilderStub::num_trajectory_builders() const { diff --git a/cartographer_grpc/mapping/map_builder_stub.h b/cartographer_grpc/mapping/map_builder_stub.h index 7bb98d8..f9f7447 100644 --- a/cartographer_grpc/mapping/map_builder_stub.h +++ b/cartographer_grpc/mapping/map_builder_stub.h @@ -22,7 +22,6 @@ #include "cartographer/mapping/map_builder_interface.h" #include "cartographer_grpc/mapping/pose_graph_stub.h" #include "cartographer_grpc/mapping/trajectory_builder_stub.h" -#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "grpc++/grpc++.h" namespace cartographer_grpc { @@ -55,7 +54,6 @@ class MapBuilderStub : public cartographer::mapping::MapBuilderInterface { private: std::shared_ptr client_channel_; - std::unique_ptr service_stub_; PoseGraphStub pose_graph_stub_; std::map> trajectory_builder_stubs_; diff --git a/cartographer_grpc/mapping/pose_graph_stub.cc b/cartographer_grpc/mapping/pose_graph_stub.cc index fe8ca55..e4e5bbd 100644 --- a/cartographer_grpc/mapping/pose_graph_stub.cc +++ b/cartographer_grpc/mapping/pose_graph_stub.cc @@ -15,22 +15,26 @@ */ #include "cartographer_grpc/mapping/pose_graph_stub.h" - #include "cartographer/mapping/pose_graph.h" +#include "cartographer_grpc/framework/client.h" +#include "cartographer_grpc/handlers/get_all_submap_poses.h" +#include "cartographer_grpc/handlers/get_constraints_handler.h" +#include "cartographer_grpc/handlers/get_local_to_global_transform_handler.h" +#include "cartographer_grpc/handlers/get_trajectory_node_poses_handler.h" +#include "cartographer_grpc/handlers/run_final_optimization_handler.h" #include "glog/logging.h" namespace cartographer_grpc { namespace mapping { -PoseGraphStub::PoseGraphStub(std::shared_ptr client_channel, - proto::MapBuilderService::Stub* stub) - : client_channel_(client_channel), stub_(stub) {} +PoseGraphStub::PoseGraphStub(std::shared_ptr client_channel) + : client_channel_(client_channel) {} void PoseGraphStub::RunFinalOptimization() { - grpc::ClientContext client_context; google::protobuf::Empty request; - google::protobuf::Empty response; - CHECK(stub_->RunFinalOptimization(&client_context, request, &response).ok()); + framework::Client client( + client_channel_); + CHECK(client.Write(request)); } cartographer::mapping::MapById< @@ -44,15 +48,14 @@ cartographer::mapping::MapById< cartographer::mapping::SubmapId, cartographer::mapping::PoseGraphInterface::SubmapPose> PoseGraphStub::GetAllSubmapPoses() { - grpc::ClientContext client_context; google::protobuf::Empty request; - proto::GetAllSubmapPosesResponse response; - stub_->GetAllSubmapPoses(&client_context, request, &response); + framework::Client client(client_channel_); + CHECK(client.Write(request)); cartographer::mapping::MapById< cartographer::mapping::SubmapId, cartographer::mapping::PoseGraphInterface::SubmapPose> submap_poses; - for (const auto& submap_pose : response.submap_poses()) { + for (const auto& submap_pose : client.response().submap_poses()) { submap_poses.Insert( cartographer::mapping::SubmapId{submap_pose.submap_id().trajectory_id(), submap_pose.submap_id().submap_index()}, @@ -65,13 +68,12 @@ PoseGraphStub::GetAllSubmapPoses() { cartographer::transform::Rigid3d PoseGraphStub::GetLocalToGlobalTransform( int trajectory_id) { - grpc::ClientContext client_context; proto::GetLocalToGlobalTransformRequest request; request.set_trajectory_id(trajectory_id); - proto::GetLocalToGlobalTransformResponse response; - CHECK(stub_->GetLocalToGlobalTransform(&client_context, request, &response) - .ok()); - return cartographer::transform::ToRigid3(response.local_to_global()); + framework::Client client( + client_channel_); + CHECK(client.Write(request)); + return cartographer::transform::ToRigid3(client.response().local_to_global()); } cartographer::mapping::MapById PoseGraphStub::GetTrajectoryNodePoses() { - grpc::ClientContext client_context; google::protobuf::Empty request; - proto::GetTrajectoryNodePosesResponse response; - stub_->GetTrajectoryNodePoses(&client_context, request, &response); + framework::Client client( + client_channel_); + CHECK(client.Write(request)); cartographer::mapping::MapById node_poses; - for (const auto& node_pose : response.node_poses()) { + for (const auto& node_pose : client.response().node_poses()) { node_poses.Insert( cartographer::mapping::NodeId{node_pose.node_id().trajectory_id(), node_pose.node_id().node_index()}, @@ -107,11 +109,10 @@ bool PoseGraphStub::IsTrajectoryFinished(int trajectory_id) { std::vector PoseGraphStub::constraints() { - grpc::ClientContext client_context; google::protobuf::Empty request; - proto::GetConstraintsResponse response; - stub_->GetConstraints(&client_context, request, &response); - return cartographer::mapping::FromProto(response.constraints()); + framework::Client client(client_channel_); + CHECK(client.Write(request)); + return cartographer::mapping::FromProto(client.response().constraints()); } cartographer::mapping::proto::PoseGraph PoseGraphStub::ToProto() { diff --git a/cartographer_grpc/mapping/pose_graph_stub.h b/cartographer_grpc/mapping/pose_graph_stub.h index 92c1f22..9f5c740 100644 --- a/cartographer_grpc/mapping/pose_graph_stub.h +++ b/cartographer_grpc/mapping/pose_graph_stub.h @@ -18,7 +18,6 @@ #define CARTOGRAPHER_GRPC_MAPPING_POSE_GRAPH_STUB_H_ #include "cartographer/mapping/pose_graph_interface.h" -#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "grpc++/grpc++.h" namespace cartographer_grpc { @@ -26,8 +25,7 @@ namespace mapping { class PoseGraphStub : public cartographer::mapping::PoseGraphInterface { public: - PoseGraphStub(std::shared_ptr client_channel, - proto::MapBuilderService::Stub* stub); + PoseGraphStub(std::shared_ptr client_channel); PoseGraphStub(const PoseGraphStub&) = delete; PoseGraphStub& operator=(const PoseGraphStub&) = delete; @@ -51,7 +49,6 @@ class PoseGraphStub : public cartographer::mapping::PoseGraphInterface { private: std::shared_ptr client_channel_; - proto::MapBuilderService::Stub* stub_; }; } // namespace mapping diff --git a/cartographer_grpc/mapping/trajectory_builder_stub.cc b/cartographer_grpc/mapping/trajectory_builder_stub.cc index cd1376e..8a78afc 100644 --- a/cartographer_grpc/mapping/trajectory_builder_stub.cc +++ b/cartographer_grpc/mapping/trajectory_builder_stub.cc @@ -27,119 +27,121 @@ namespace mapping { TrajectoryBuilderStub::TrajectoryBuilderStub( std::shared_ptr client_channel, const int trajectory_id, LocalSlamResultCallback local_slam_result_callback) - : client_channel_(client_channel), trajectory_id_(trajectory_id) { - stub_ = proto::MapBuilderService::NewStub(client_channel_); - CHECK(stub_) << "Failed to create stub."; + : client_channel_(client_channel), + trajectory_id_(trajectory_id), + receive_local_slam_results_client_(client_channel_) { if (local_slam_result_callback) { proto::ReceiveLocalSlamResultsRequest request; request.set_trajectory_id(trajectory_id); - local_slam_result_reader_.client_reader = stub_->ReceiveLocalSlamResults( - &local_slam_result_reader_.client_context, request); - auto* client_reader_ptr = local_slam_result_reader_.client_reader.get(); - local_slam_result_reader_.thread = + receive_local_slam_results_client_.Write(request); + auto* receive_local_slam_results_client_ptr = + &receive_local_slam_results_client_; + receive_local_slam_results_thread_ = cartographer::common::make_unique( - [client_reader_ptr, local_slam_result_callback]() { - RunLocalSlamResultReader(client_reader_ptr, - local_slam_result_callback); + [receive_local_slam_results_client_ptr, + local_slam_result_callback]() { + RunLocalSlamResultsReader(receive_local_slam_results_client_ptr, + local_slam_result_callback); }); } } TrajectoryBuilderStub::~TrajectoryBuilderStub() { - if (local_slam_result_reader_.thread) { - local_slam_result_reader_.thread->join(); + if (receive_local_slam_results_thread_) { + receive_local_slam_results_thread_->join(); } - if (rangefinder_writer_.client_writer) { - CHECK(rangefinder_writer_.client_writer->WritesDone()); - CHECK(rangefinder_writer_.client_writer->Finish().ok()); + if (add_rangefinder_client_) { + CHECK(add_rangefinder_client_->WritesDone()); + CHECK(add_rangefinder_client_->Finish().ok()); } - if (imu_writer_.client_writer) { - CHECK(imu_writer_.client_writer->WritesDone()); - CHECK(imu_writer_.client_writer->Finish().ok()); + if (add_imu_client_) { + CHECK(add_imu_client_->WritesDone()); + CHECK(add_imu_client_->Finish().ok()); } - if (odometry_writer_.client_writer) { - CHECK(odometry_writer_.client_writer->WritesDone()); - CHECK(odometry_writer_.client_writer->Finish().ok()); + if (add_odometry_client_) { + CHECK(add_odometry_client_->WritesDone()); + CHECK(add_odometry_client_->Finish().ok()); } - if (fixed_frame_writer_.client_writer) { - CHECK(fixed_frame_writer_.client_writer->WritesDone()); - CHECK(fixed_frame_writer_.client_writer->Finish().ok()); + if (add_landmark_client_) { + CHECK(add_landmark_client_->WritesDone()); + CHECK(add_landmark_client_->Finish().ok()); + } + if (add_fixed_frame_pose_client_) { + CHECK(add_fixed_frame_pose_client_->WritesDone()); + CHECK(add_fixed_frame_pose_client_->Finish().ok()); } } void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const cartographer::sensor::TimedPointCloudData& timed_point_cloud_data) { - if (!rangefinder_writer_.client_writer) { - rangefinder_writer_.client_writer = stub_->AddRangefinderData( - &rangefinder_writer_.client_context, &rangefinder_writer_.response); - CHECK(rangefinder_writer_.client_writer); + if (!add_rangefinder_client_) { + add_rangefinder_client_ = cartographer::common::make_unique< + framework::Client>( + client_channel_); } proto::AddRangefinderDataRequest request; sensor::CreateAddRangeFinderDataRequest( sensor_id, trajectory_id_, cartographer::sensor::ToProto(timed_point_cloud_data), &request); - rangefinder_writer_.client_writer->Write(request); + add_rangefinder_client_->Write(request); } void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const cartographer::sensor::ImuData& imu_data) { - if (!imu_writer_.client_writer) { - imu_writer_.client_writer = - stub_->AddImuData(&imu_writer_.client_context, &imu_writer_.response); - CHECK(imu_writer_.client_writer); + if (!add_imu_client_) { + add_imu_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); } proto::AddImuDataRequest request; sensor::CreateAddImuDataRequest(sensor_id, trajectory_id_, cartographer::sensor::ToProto(imu_data), &request); - imu_writer_.client_writer->Write(request); + add_imu_client_->Write(request); } void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const cartographer::sensor::OdometryData& odometry_data) { - if (!odometry_writer_.client_writer) { - odometry_writer_.client_writer = stub_->AddOdometryData( - &odometry_writer_.client_context, &odometry_writer_.response); - CHECK(odometry_writer_.client_writer); + if (!add_odometry_client_) { + add_odometry_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); } proto::AddOdometryDataRequest request; sensor::CreateAddOdometryDataRequest( sensor_id, trajectory_id_, cartographer::sensor::ToProto(odometry_data), &request); - odometry_writer_.client_writer->Write(request); + add_odometry_client_->Write(request); } void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const cartographer::sensor::FixedFramePoseData& fixed_frame_pose) { - if (!fixed_frame_writer_.client_writer) { - fixed_frame_writer_.client_writer = stub_->AddFixedFramePoseData( - &fixed_frame_writer_.client_context, &fixed_frame_writer_.response); - CHECK(fixed_frame_writer_.client_writer); + if (!add_fixed_frame_pose_client_) { + add_fixed_frame_pose_client_ = cartographer::common::make_unique< + framework::Client>( + client_channel_); } proto::AddFixedFramePoseDataRequest request; sensor::CreateAddFixedFramePoseDataRequest( sensor_id, trajectory_id_, cartographer::sensor::ToProto(fixed_frame_pose), &request); - fixed_frame_writer_.client_writer->Write(request); + add_fixed_frame_pose_client_->Write(request); } void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const cartographer::sensor::LandmarkData& landmark_data) { - if (!landmark_writer_.client_writer) { - landmark_writer_.client_writer = stub_->AddLandmarkData( - &landmark_writer_.client_context, &landmark_writer_.response); - CHECK(landmark_writer_.client_writer); + if (!add_landmark_client_) { + add_landmark_client_ = cartographer::common::make_unique< + framework::Client>(client_channel_); } proto::AddLandmarkDataRequest request; sensor::CreateAddLandmarkDataRequest( sensor_id, trajectory_id_, cartographer::sensor::ToProto(landmark_data), &request); - landmark_writer_.client_writer->Write(request); + add_landmark_client_->Write(request); } void TrajectoryBuilderStub::AddLocalSlamResultData( @@ -148,11 +150,11 @@ void TrajectoryBuilderStub::AddLocalSlamResultData( LOG(FATAL) << "Not implemented"; } -void TrajectoryBuilderStub::RunLocalSlamResultReader( - grpc::ClientReader* client_reader, +void TrajectoryBuilderStub::RunLocalSlamResultsReader( + framework::Client* client, LocalSlamResultCallback local_slam_result_callback) { proto::ReceiveLocalSlamResultsResponse response; - while (client_reader->Read(&response)) { + while (client->Read(&response)) { int trajectory_id = response.trajectory_id(); cartographer::common::Time time = cartographer::common::FromUniversal(response.timestamp()); @@ -170,7 +172,7 @@ void TrajectoryBuilderStub::RunLocalSlamResultReader( local_slam_result_callback(trajectory_id, time, local_pose, range_data, std::move(insertion_result)); } - client_reader->Finish(); + client->Finish(); } } // namespace mapping diff --git a/cartographer_grpc/mapping/trajectory_builder_stub.h b/cartographer_grpc/mapping/trajectory_builder_stub.h index a5e324f..7095ae5 100644 --- a/cartographer_grpc/mapping/trajectory_builder_stub.h +++ b/cartographer_grpc/mapping/trajectory_builder_stub.h @@ -21,8 +21,14 @@ #include "cartographer/mapping/local_slam_result_data.h" #include "cartographer/mapping/trajectory_builder_interface.h" -#include "cartographer_grpc/framework/client_writer.h" -#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" +#include "cartographer_grpc/framework/client.h" +#include "cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h" +#include "cartographer_grpc/handlers/add_imu_data_handler.h" +#include "cartographer_grpc/handlers/add_landmark_data_handler.h" +#include "cartographer_grpc/handlers/add_local_slam_result_data_handler.h" +#include "cartographer_grpc/handlers/add_odometry_data_handler.h" +#include "cartographer_grpc/handlers/add_rangefinder_data_handler.h" +#include "cartographer_grpc/handlers/receive_local_slam_results_handler.h" #include "grpc++/grpc++.h" namespace cartographer_grpc { @@ -57,27 +63,26 @@ class TrajectoryBuilderStub local_slam_result_data) override; private: - struct LocalSlamResultReader { - grpc::ClientContext client_context; - std::unique_ptr> - client_reader; - std::unique_ptr thread; - }; - - static void RunLocalSlamResultReader( - grpc::ClientReader* client_reader, + static void RunLocalSlamResultsReader( + framework::Client* + client_reader, LocalSlamResultCallback local_slam_result_callback); std::shared_ptr client_channel_; const int trajectory_id_; - std::unique_ptr stub_; - framework::ClientWriter rangefinder_writer_; - framework::ClientWriter imu_writer_; - framework::ClientWriter odometry_writer_; - framework::ClientWriter - fixed_frame_writer_; - framework::ClientWriter landmark_writer_; - LocalSlamResultReader local_slam_result_reader_; + std::unique_ptr> + add_rangefinder_client_; + std::unique_ptr> + add_imu_client_; + std::unique_ptr> + add_odometry_client_; + std::unique_ptr> + add_fixed_frame_pose_client_; + std::unique_ptr> + add_landmark_client_; + framework::Client + receive_local_slam_results_client_; + std::unique_ptr receive_local_slam_results_thread_; }; } // namespace mapping