From c053fc7a2fda0ef83f6bbbcf31980b3563afc60d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Mon, 15 Jan 2018 15:31:33 +0100 Subject: [PATCH] Implement sensor data forwarding. (#818) --- .../add_fixed_frame_pose_data_handler.h | 17 +++++ .../handlers/add_imu_data_handler.h | 17 +++++ .../handlers/add_odometry_data_handler.h | 16 +++++ .../handlers/add_trajectory_handler.h | 4 +- .../handlers/finish_trajectory_handler.h | 4 +- ...loader.cc => local_trajectory_uploader.cc} | 14 ++-- ...uploader.h => local_trajectory_uploader.h} | 16 +++-- cartographer_grpc/map_builder_server.cc | 5 +- cartographer_grpc/map_builder_server.h | 8 +-- .../mapping/trajectory_builder_stub.cc | 31 ++++----- .../mapping/trajectory_builder_stub.h | 1 - cartographer_grpc/sensor/serialization.cc | 67 +++++++++++++++++++ cartographer_grpc/sensor/serialization.h | 53 +++++++++++++++ 13 files changed, 214 insertions(+), 39 deletions(-) rename cartographer_grpc/{data_uploader.cc => local_trajectory_uploader.cc} (82%) rename cartographer_grpc/{data_uploader.h => local_trajectory_uploader.h} (72%) create mode 100644 cartographer_grpc/sensor/serialization.cc create mode 100644 cartographer_grpc/sensor/serialization.h diff --git a/cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h b/cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h index 86b038a..9a0db03 100644 --- a/cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h +++ b/cartographer_grpc/handlers/add_fixed_frame_pose_data_handler.h @@ -21,6 +21,7 @@ #include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/map_builder_server.h" #include "cartographer_grpc/proto/map_builder_service.pb.h" +#include "cartographer_grpc/sensor/serialization.h" #include "google/protobuf/empty.pb.h" namespace cartographer_grpc { @@ -40,6 +41,22 @@ class AddFixedFramePoseDataHandler request.sensor_metadata().trajectory_id(), request.sensor_metadata().sensor_id(), cartographer::sensor::FromProto(request.fixed_frame_pose_data())); + + // The 'BlockingQueue' in 'LocalTrajectoryUploader' is thread-safe. + // Therefore it suffices to get an unsynchronized reference to the + // 'MapBuilderContext'. + if (GetUnsynchronizedContext() + ->local_trajectory_uploader()) { + auto data_request = cartographer::common::make_unique< + proto::AddFixedFramePoseDataRequest>(); + mapping::CreateAddFixedFramePoseDataRequest( + request.sensor_metadata().sensor_id(), + request.sensor_metadata().trajectory_id(), + request.fixed_frame_pose_data(), data_request.get()); + GetUnsynchronizedContext() + ->local_trajectory_uploader() + ->EnqueueDataRequest(std::move(data_request)); + } } void OnReadsDone() override { diff --git a/cartographer_grpc/handlers/add_imu_data_handler.h b/cartographer_grpc/handlers/add_imu_data_handler.h index edd1aa8..47b2be7 100644 --- a/cartographer_grpc/handlers/add_imu_data_handler.h +++ b/cartographer_grpc/handlers/add_imu_data_handler.h @@ -21,6 +21,7 @@ #include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/map_builder_server.h" #include "cartographer_grpc/proto/map_builder_service.pb.h" +#include "cartographer_grpc/sensor/serialization.h" #include "google/protobuf/empty.pb.h" namespace cartographer_grpc { @@ -39,6 +40,22 @@ class AddImuDataHandler request.sensor_metadata().trajectory_id(), request.sensor_metadata().sensor_id(), cartographer::sensor::FromProto(request.imu_data())); + + // The 'BlockingQueue' in 'LocalTrajectoryUploader' is thread-safe. + // Therefore it suffices to get an unsynchronized reference to the + // 'MapBuilderContext'. + if (GetUnsynchronizedContext() + ->local_trajectory_uploader()) { + auto data_request = + cartographer::common::make_unique(); + mapping::CreateAddImuDataRequest( + request.sensor_metadata().sensor_id(), + request.sensor_metadata().trajectory_id(), request.imu_data(), + data_request.get()); + GetUnsynchronizedContext() + ->local_trajectory_uploader() + ->EnqueueDataRequest(std::move(data_request)); + } } void OnReadsDone() override { diff --git a/cartographer_grpc/handlers/add_odometry_data_handler.h b/cartographer_grpc/handlers/add_odometry_data_handler.h index 320b2a0..f5523fc 100644 --- a/cartographer_grpc/handlers/add_odometry_data_handler.h +++ b/cartographer_grpc/handlers/add_odometry_data_handler.h @@ -40,6 +40,22 @@ class AddOdometryDataHandler request.sensor_metadata().trajectory_id(), request.sensor_metadata().sensor_id(), cartographer::sensor::FromProto(request.odometry_data())); + + // The 'BlockingQueue' in 'LocalTrajectoryUploader' is thread-safe. + // Therefore it suffices to get an unsynchronized reference to the + // 'MapBuilderContext'. + if (GetUnsynchronizedContext() + ->local_trajectory_uploader()) { + auto data_request = + cartographer::common::make_unique(); + mapping::CreateAddOdometryDataRequest( + request.sensor_metadata().sensor_id(), + request.sensor_metadata().trajectory_id(), request.odometry_data(), + data_request.get()); + GetUnsynchronizedContext() + ->local_trajectory_uploader() + ->EnqueueDataRequest(std::move(data_request)); + } } void OnReadsDone() override { diff --git a/cartographer_grpc/handlers/add_trajectory_handler.h b/cartographer_grpc/handlers/add_trajectory_handler.h index e651ff8..17598a5 100644 --- a/cartographer_grpc/handlers/add_trajectory_handler.h +++ b/cartographer_grpc/handlers/add_trajectory_handler.h @@ -43,7 +43,7 @@ class AddTrajectoryHandler request.trajectory_builder_options(), local_slam_result_callback); if (GetUnsynchronizedContext() - ->data_uploader()) { + ->local_trajectory_uploader()) { auto trajectory_builder_options = request.trajectory_builder_options(); // Clear the trajectory builder options to convey to the cloud @@ -53,7 +53,7 @@ class AddTrajectoryHandler trajectory_builder_options.clear_trajectory_builder_3d_options(); GetContext() - ->data_uploader() + ->local_trajectory_uploader() ->AddTrajectory(trajectory_id, expected_sensor_ids, trajectory_builder_options); } diff --git a/cartographer_grpc/handlers/finish_trajectory_handler.h b/cartographer_grpc/handlers/finish_trajectory_handler.h index c97d594..3e40238 100644 --- a/cartographer_grpc/handlers/finish_trajectory_handler.h +++ b/cartographer_grpc/handlers/finish_trajectory_handler.h @@ -37,9 +37,9 @@ class FinishTrajectoryHandler GetUnsynchronizedContext() ->NotifyFinishTrajectory(request.trajectory_id()); if (GetUnsynchronizedContext() - ->data_uploader()) { + ->local_trajectory_uploader()) { GetContext() - ->data_uploader() + ->local_trajectory_uploader() ->FinishTrajectory(request.trajectory_id()); } Send(cartographer::common::make_unique()); diff --git a/cartographer_grpc/data_uploader.cc b/cartographer_grpc/local_trajectory_uploader.cc similarity index 82% rename from cartographer_grpc/data_uploader.cc rename to cartographer_grpc/local_trajectory_uploader.cc index 02f82cc..f3e2746 100644 --- a/cartographer_grpc/data_uploader.cc +++ b/cartographer_grpc/local_trajectory_uploader.cc @@ -14,18 +14,19 @@ * limitations under the License. */ -#include "cartographer_grpc/data_uploader.h" +#include "cartographer_grpc/local_trajectory_uploader.h" #include "glog/logging.h" namespace cartographer_grpc { -DataUploader::DataUploader(const std::string& server_address) +LocalTrajectoryUploader::LocalTrajectoryUploader( + const std::string& server_address) : client_channel_(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())), service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {} -void DataUploader::AddTrajectory( +void LocalTrajectoryUploader::AddTrajectory( int local_trajectory_id, const std::unordered_set& expected_sensor_ids, const cartographer::mapping::proto::TrajectoryBuilderOptions& @@ -45,7 +46,7 @@ void DataUploader::AddTrajectory( result.trajectory_id(); } -void DataUploader::FinishTrajectory(int local_trajectory_id) { +void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) { CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1); int cloud_trajectory_id = local_to_cloud_trajectory_id_map_[local_trajectory_id]; @@ -58,4 +59,9 @@ void DataUploader::FinishTrajectory(int local_trajectory_id) { CHECK(status.ok()); } +void LocalTrajectoryUploader::EnqueueDataRequest( + std::unique_ptr data_request) { + send_queue_.Push(std::move(data_request)); +} + } // namespace cartographer_grpc \ No newline at end of file diff --git a/cartographer_grpc/data_uploader.h b/cartographer_grpc/local_trajectory_uploader.h similarity index 72% rename from cartographer_grpc/data_uploader.h rename to cartographer_grpc/local_trajectory_uploader.h index ca26d32..cb1623b 100644 --- a/cartographer_grpc/data_uploader.h +++ b/cartographer_grpc/local_trajectory_uploader.h @@ -14,35 +14,41 @@ * limitations under the License. */ -#ifndef CARTOGRAPHER_GRPC_DATA_UPLOADER_H -#define CARTOGRAPHER_GRPC_DATA_UPLOADER_H +#ifndef CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H +#define CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H #include #include #include +#include "cartographer/common/blocking_queue.h" #include "cartographer/mapping/proto/trajectory_builder_options.pb.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "grpc++/grpc++.h" namespace cartographer_grpc { -class DataUploader { +class LocalTrajectoryUploader { public: - DataUploader(const std::string& server_address); + LocalTrajectoryUploader(const std::string& server_address); void AddTrajectory( int local_trajectory_id, const std::unordered_set& expected_sensor_ids, const cartographer::mapping::proto::TrajectoryBuilderOptions& trajectory_options); void FinishTrajectory(int local_trajectory_id); + void EnqueueDataRequest( + std::unique_ptr data_request); private: std::shared_ptr client_channel_; std::unique_ptr service_stub_; std::map local_to_cloud_trajectory_id_map_; + cartographer::common::BlockingQueue< + std::unique_ptr> + send_queue_; }; } // namespace cartographer_grpc -#endif // CARTOGRAPHER_GRPC_DATA_UPLOADER_H +#endif // CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index 8cf1951..964befb 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -208,8 +208,9 @@ MapBuilderServer::MapBuilderServer( server_builder.SetNumEventThreads( map_builder_server_options.num_event_threads()); if (!map_builder_server_options.uplink_server_address().empty()) { - data_uploader_ = cartographer::common::make_unique( - map_builder_server_options.uplink_server_address()); + local_trajectory_uploader_ = + cartographer::common::make_unique( + map_builder_server_options.uplink_server_address()); } server_builder.RegisterHandler("AddTrajectory"); diff --git a/cartographer_grpc/map_builder_server.h b/cartographer_grpc/map_builder_server.h index 3562603..2fc85da 100644 --- a/cartographer_grpc/map_builder_server.h +++ b/cartographer_grpc/map_builder_server.h @@ -23,9 +23,9 @@ #include "cartographer/mapping/map_builder.h" #include "cartographer/mapping/trajectory_builder_interface.h" #include "cartographer/sensor/dispatchable.h" -#include "cartographer_grpc/data_uploader.h" #include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/server.h" +#include "cartographer_grpc/local_trajectory_uploader.h" #include "cartographer_grpc/proto/map_builder_server_options.pb.h" namespace cartographer_grpc { @@ -70,8 +70,8 @@ class MapBuilderServer { ProcessLocalSlamResultData( const std::string& sensor_id, cartographer::common::Time time, const cartographer::mapping::proto::LocalSlamResultData& proto); - DataUploader* data_uploader() { - return map_builder_server_->data_uploader_.get(); + LocalTrajectoryUploader* local_trajectory_uploader() { + return map_builder_server_->local_trajectory_uploader_.get(); } template @@ -154,7 +154,7 @@ class MapBuilderServer { int current_subscription_index_ = 0; std::map local_slam_subscriptions_ GUARDED_BY(local_slam_subscriptions_lock_); - std::unique_ptr data_uploader_; + std::unique_ptr local_trajectory_uploader_; }; } // namespace cartographer_grpc diff --git a/cartographer_grpc/mapping/trajectory_builder_stub.cc b/cartographer_grpc/mapping/trajectory_builder_stub.cc index e638996..df08771 100644 --- a/cartographer_grpc/mapping/trajectory_builder_stub.cc +++ b/cartographer_grpc/mapping/trajectory_builder_stub.cc @@ -18,6 +18,7 @@ #include "cartographer/mapping/local_slam_result_data.h" #include "cartographer_grpc/proto/map_builder_service.pb.h" +#include "cartographer_grpc/sensor/serialization.h" #include "glog/logging.h" namespace cartographer_grpc { @@ -75,9 +76,9 @@ void TrajectoryBuilderStub::AddSensorData( CHECK(rangefinder_writer_.client_writer); } proto::AddRangefinderDataRequest request; - *request.mutable_sensor_metadata() = CreateSensorMetadata(sensor_id); - *request.mutable_timed_point_cloud_data() = - cartographer::sensor::ToProto(timed_point_cloud_data); + CreateAddRangeFinderDataRequest( + sensor_id, trajectory_id_, + cartographer::sensor::ToProto(timed_point_cloud_data), &request); rangefinder_writer_.client_writer->Write(request); } @@ -90,8 +91,8 @@ void TrajectoryBuilderStub::AddSensorData( CHECK(imu_writer_.client_writer); } proto::AddImuDataRequest request; - *request.mutable_sensor_metadata() = CreateSensorMetadata(sensor_id); - *request.mutable_imu_data() = cartographer::sensor::ToProto(imu_data); + CreateAddImuDataRequest(sensor_id, trajectory_id_, + cartographer::sensor::ToProto(imu_data), &request); imu_writer_.client_writer->Write(request); } @@ -104,9 +105,9 @@ void TrajectoryBuilderStub::AddSensorData( CHECK(odometry_writer_.client_writer); } proto::AddOdometryDataRequest request; - *request.mutable_sensor_metadata() = CreateSensorMetadata(sensor_id); - *request.mutable_odometry_data() = - cartographer::sensor::ToProto(odometry_data); + CreateAddOdometryDataRequest(sensor_id, trajectory_id_, + cartographer::sensor::ToProto(odometry_data), + &request); odometry_writer_.client_writer->Write(request); } @@ -119,9 +120,9 @@ void TrajectoryBuilderStub::AddSensorData( CHECK(fixed_frame_writer_.client_writer); } proto::AddFixedFramePoseDataRequest request; - *request.mutable_sensor_metadata() = CreateSensorMetadata(sensor_id); - *request.mutable_fixed_frame_pose_data() = - cartographer::sensor::ToProto(fixed_frame_pose); + CreateAddFixedFramePoseDataRequest( + sensor_id, trajectory_id_, + cartographer::sensor::ToProto(fixed_frame_pose), &request); fixed_frame_writer_.client_writer->Write(request); } @@ -131,14 +132,6 @@ void TrajectoryBuilderStub::AddLocalSlamResultData( LOG(FATAL) << "Not implemented"; } -proto::SensorMetadata TrajectoryBuilderStub::CreateSensorMetadata( - const std::string& sensor_id) { - proto::SensorMetadata sensor_metadata; - sensor_metadata.set_sensor_id(sensor_id); - sensor_metadata.set_trajectory_id(trajectory_id_); - return sensor_metadata; -} - void TrajectoryBuilderStub::RunLocalSlamResultReader( grpc::ClientReader* client_reader, LocalSlamResultCallback local_slam_result_callback) { diff --git a/cartographer_grpc/mapping/trajectory_builder_stub.h b/cartographer_grpc/mapping/trajectory_builder_stub.h index 12f4a5a..a6f0c63 100644 --- a/cartographer_grpc/mapping/trajectory_builder_stub.h +++ b/cartographer_grpc/mapping/trajectory_builder_stub.h @@ -66,7 +66,6 @@ class TrajectoryBuilderStub std::unique_ptr thread; }; - proto::SensorMetadata CreateSensorMetadata(const std::string& sensor_id); static void RunLocalSlamResultReader( grpc::ClientReader* client_reader, LocalSlamResultCallback local_slam_result_callback); diff --git a/cartographer_grpc/sensor/serialization.cc b/cartographer_grpc/sensor/serialization.cc new file mode 100644 index 0000000..968b522 --- /dev/null +++ b/cartographer_grpc/sensor/serialization.cc @@ -0,0 +1,67 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "serialization.h" + +namespace cartographer_grpc { +namespace mapping { + +void CreateSensorMetadata(const std::string& sensor_id, const int trajectory_id, + proto::SensorMetadata* proto) { + proto->set_sensor_id(sensor_id); + proto->set_trajectory_id(trajectory_id); +} + +void CreateAddFixedFramePoseDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::FixedFramePoseData& + fixed_frame_pose_data, + proto::AddFixedFramePoseDataRequest* proto) { + CreateSensorMetadata(sensor_id, trajectory_id, + proto->mutable_sensor_metadata()); + *proto->mutable_fixed_frame_pose_data() = fixed_frame_pose_data; +} + +void CreateAddImuDataRequest( + const std::string& sensor_id, const int trajectory_id, + const cartographer::sensor::proto::ImuData& imu_data, + proto::AddImuDataRequest* proto) { + CreateSensorMetadata(sensor_id, trajectory_id, + proto->mutable_sensor_metadata()); + *proto->mutable_imu_data() = imu_data; +} + +void CreateAddOdometryDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::OdometryData& odometry_data, + proto::AddOdometryDataRequest* proto) { + CreateSensorMetadata(sensor_id, trajectory_id, + proto->mutable_sensor_metadata()); + *proto->mutable_odometry_data() = odometry_data; +} + +void CreateAddRangeFinderDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::TimedPointCloudData& + timed_point_cloud_data, + proto::AddRangefinderDataRequest* proto) { + CreateSensorMetadata(sensor_id, trajectory_id, + proto->mutable_sensor_metadata()); + *proto->mutable_timed_point_cloud_data() = timed_point_cloud_data; +} + +} // namespace mapping +} // namespace cartographer_grpc diff --git a/cartographer_grpc/sensor/serialization.h b/cartographer_grpc/sensor/serialization.h new file mode 100644 index 0000000..7f850b1 --- /dev/null +++ b/cartographer_grpc/sensor/serialization.h @@ -0,0 +1,53 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_SENSOR_SERIALIZATION_H +#define CARTOGRAPHER_GRPC_SENSOR_SERIALIZATION_H + +#include "cartographer/sensor/fixed_frame_pose_data.h" +#include "cartographer/sensor/imu_data.h" +#include "cartographer/sensor/odometry_data.h" +#include "cartographer/sensor/timed_point_cloud_data.h" +#include "cartographer_grpc/proto/map_builder_service.pb.h" + +namespace cartographer_grpc { +namespace mapping { + +void CreateSensorMetadata(const std::string& sensor_id, int trajectory_id, + proto::SensorMetadata* proto); +void CreateAddFixedFramePoseDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::FixedFramePoseData& + fixed_frame_pose_data, + proto::AddFixedFramePoseDataRequest* proto); +void CreateAddImuDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::ImuData& imu_data, + proto::AddImuDataRequest* proto); +void CreateAddOdometryDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::OdometryData& odometry_data, + proto::AddOdometryDataRequest* proto); +void CreateAddRangeFinderDataRequest( + const std::string& sensor_id, int trajectory_id, + const cartographer::sensor::proto::TimedPointCloudData& + timed_point_cloud_data, + proto::AddRangefinderDataRequest* proto); + +} // namespace mapping +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_SENSOR_SERIALIZATION_H