diff --git a/cartographer_grpc/framework/server.h b/cartographer_grpc/framework/server.h index 0dac235..16fad41 100644 --- a/cartographer_grpc/framework/server.h +++ b/cartographer_grpc/framework/server.h @@ -105,6 +105,11 @@ class Server { return {execution_context_->lock(), execution_context_.get()}; } + template + T* GetUnsynchronizedContext() { + return dynamic_cast(execution_context_.get()); + } + protected: Server(const Options& options); void AddService( diff --git a/cartographer_grpc/local_trajectory_uploader.cc b/cartographer_grpc/local_trajectory_uploader.cc index 64585ba..0aca68e 100644 --- a/cartographer_grpc/local_trajectory_uploader.cc +++ b/cartographer_grpc/local_trajectory_uploader.cc @@ -69,18 +69,21 @@ void LocalTrajectoryUploader::ProcessSendQueue() { while (!shutting_down_) { auto data_message = send_queue_.PopWithTimeout(kPopTimeout); if (data_message) { - if (const auto *fixed_frame_pose_data = - dynamic_cast( + if (auto *fixed_frame_pose_data = + dynamic_cast( data_message.get())) { ProcessFixedFramePoseDataMessage(fixed_frame_pose_data); - } else if (const auto *imu_data = - dynamic_cast( - data_message.get())) { + } else if (auto *imu_data = dynamic_cast( + data_message.get())) { ProcessImuDataMessage(imu_data); - } else if (const auto *odometry_data = - dynamic_cast( + } else if (auto *odometry_data = + dynamic_cast( data_message.get())) { ProcessOdometryDataMessage(odometry_data); + } else if (auto *local_slam_result_data = + dynamic_cast( + data_message.get())) { + ProcessLocalSlamResultDataMessage(local_slam_result_data); } else { LOG(FATAL) << "Unknown message type: " << data_message->GetTypeName(); } @@ -88,8 +91,15 @@ void LocalTrajectoryUploader::ProcessSendQueue() { } } +void LocalTrajectoryUploader::TranslateTrajectoryId( + proto::SensorMetadata *sensor_metadata) { + int cloud_trajectory_id = + local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id()); + sensor_metadata->set_trajectory_id(cloud_trajectory_id); +} + void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage( - const proto::AddFixedFramePoseDataRequest *data_request) { + proto::AddFixedFramePoseDataRequest *data_request) { if (!fixed_frame_pose_writer_.client_writer) { fixed_frame_pose_writer_.client_writer = service_stub_->AddFixedFramePoseData( @@ -97,29 +107,52 @@ void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage( &fixed_frame_pose_writer_.response); CHECK(fixed_frame_pose_writer_.client_writer); } + TranslateTrajectoryId(data_request->mutable_sensor_metadata()); fixed_frame_pose_writer_.client_writer->Write(*data_request); } void LocalTrajectoryUploader::ProcessImuDataMessage( - const proto::AddImuDataRequest *data_request) { + proto::AddImuDataRequest *data_request) { if (!imu_writer_.client_writer) { imu_writer_.client_writer = service_stub_->AddImuData( &imu_writer_.client_context, &imu_writer_.response); CHECK(imu_writer_.client_writer); } + TranslateTrajectoryId(data_request->mutable_sensor_metadata()); imu_writer_.client_writer->Write(*data_request); } void LocalTrajectoryUploader::ProcessOdometryDataMessage( - const proto::AddOdometryDataRequest *data_request) { + proto::AddOdometryDataRequest *data_request) { if (!odometry_writer_.client_writer) { odometry_writer_.client_writer = service_stub_->AddOdometryData( &odometry_writer_.client_context, &odometry_writer_.response); CHECK(odometry_writer_.client_writer); } + TranslateTrajectoryId(data_request->mutable_sensor_metadata()); odometry_writer_.client_writer->Write(*data_request); } +void LocalTrajectoryUploader::ProcessLocalSlamResultDataMessage( + proto::AddLocalSlamResultDataRequest *data_request) { + if (!local_slam_result_writer_.client_writer) { + local_slam_result_writer_.client_writer = + service_stub_->AddLocalSlamResultData( + &local_slam_result_writer_.client_context, + &local_slam_result_writer_.response); + CHECK(local_slam_result_writer_.client_writer); + } + TranslateTrajectoryId(data_request->mutable_sensor_metadata()); + // A submap also holds a trajectory id that must be translated to uplink's + // trajectory id. + for (cartographer::mapping::proto::Submap &mutable_submap : + *data_request->mutable_local_slam_result_data()->mutable_submaps()) { + mutable_submap.mutable_submap_id()->set_trajectory_id( + data_request->sensor_metadata().trajectory_id()); + } + local_slam_result_writer_.client_writer->Write(*data_request); +} + void LocalTrajectoryUploader::AddTrajectory( int local_trajectory_id, const std::set &expected_sensor_ids, const cartographer::mapping::proto::TrajectoryBuilderOptions @@ -129,8 +162,13 @@ void LocalTrajectoryUploader::AddTrajectory( proto::AddTrajectoryResponse result; *request.mutable_trajectory_builder_options() = trajectory_options; for (const SensorId &sensor_id : expected_sensor_ids) { - *request.add_expected_sensor_ids() = sensor::ToProto(sensor_id); + // Range sensors are not forwarded, but combined into a LocalSlamResult. + if (sensor_id.type != SensorId::SensorType::RANGE) { + *request.add_expected_sensor_ids() = sensor::ToProto(sensor_id); + } } + *request.add_expected_sensor_ids() = + sensor::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); grpc::Status status = service_stub_->AddTrajectory(&client_context, request, &result); CHECK(status.ok()); diff --git a/cartographer_grpc/local_trajectory_uploader.h b/cartographer_grpc/local_trajectory_uploader.h index 5788f6b..32f92ca 100644 --- a/cartographer_grpc/local_trajectory_uploader.h +++ b/cartographer_grpc/local_trajectory_uploader.h @@ -51,16 +51,25 @@ class LocalTrajectoryUploader { const cartographer::mapping::proto::TrajectoryBuilderOptions& trajectory_options); void FinishTrajectory(int local_trajectory_id); + + // Enqueue an Add*DataRequest message to be uploaded. void EnqueueDataRequest( std::unique_ptr data_request); + SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const { + return SensorId{SensorId::SensorType::LOCAL_SLAM_RESULT, + "local_slam_result_" + std::to_string(local_trajectory_id)}; + } + private: void ProcessSendQueue(); + void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); void ProcessFixedFramePoseDataMessage( - const proto::AddFixedFramePoseDataRequest* data_request); - void ProcessImuDataMessage(const proto::AddImuDataRequest* data_request); - void ProcessOdometryDataMessage( - const proto::AddOdometryDataRequest* data_request); + proto::AddFixedFramePoseDataRequest* data_request); + void ProcessImuDataMessage(proto::AddImuDataRequest* data_request); + void ProcessLocalSlamResultDataMessage( + proto::AddLocalSlamResultDataRequest* data_request); + void ProcessOdometryDataMessage(proto::AddOdometryDataRequest* data_request); std::shared_ptr client_channel_; std::unique_ptr service_stub_; @@ -73,6 +82,8 @@ class LocalTrajectoryUploader { framework::ClientWriter fixed_frame_pose_writer_; framework::ClientWriter imu_writer_; + framework::ClientWriter + local_slam_result_writer_; framework::ClientWriter odometry_writer_; }; diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index d102dcc..e9db38a 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -33,6 +33,7 @@ #include "cartographer_grpc/handlers/receive_local_slam_results_handler.h" #include "cartographer_grpc/handlers/run_final_optimization.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" +#include "cartographer_grpc/sensor/serialization.h" #include "glog/logging.h" namespace cartographer_grpc { @@ -171,7 +172,7 @@ std::unique_ptr MapBuilderServer::MapBuilderContext::ProcessLocalSlamResultData( const std::string& sensor_id, cartographer::common::Time time, const cartographer::mapping::proto::LocalSlamResultData& proto) { - CHECK_GE(proto.submaps().size(), 0); + CHECK_GE(proto.submaps().size(), 1); CHECK(proto.submaps(0).has_submap_2d() || proto.submaps(0).has_submap_3d()); if (proto.submaps(0).has_submap_2d()) { std::vector> @@ -318,6 +319,29 @@ void MapBuilderServer::OnLocalSlamResult( insertion_result) { auto shared_range_data = std::make_shared(std::move(range_data)); + + // If there is an uplink server and a submap insertion happened, enqueue this + // local SLAM result for uploading. + if (insertion_result && + grpc_server_->GetUnsynchronizedContext() + ->local_trajectory_uploader()) { + auto data_request = cartographer::common::make_unique< + proto::AddLocalSlamResultDataRequest>(); + auto sensor_id = grpc_server_->GetUnsynchronizedContext() + ->local_trajectory_uploader() + ->GetLocalSlamResultSensorId(trajectory_id); + sensor::CreateAddLocalSlamResultDataRequest( + sensor_id.id, trajectory_id, time, starting_submap_index_, + *insertion_result, data_request.get()); + // TODO(cschuet): Make this more robust. + if (insertion_result->insertion_submaps.front()->finished()) { + ++starting_submap_index_; + } + grpc_server_->GetUnsynchronizedContext() + ->local_trajectory_uploader() + ->EnqueueDataRequest(std::move(data_request)); + } + cartographer::common::MutexLocker locker(&local_slam_subscriptions_lock_); for (auto& entry : local_slam_subscriptions_[trajectory_id]) { auto copy_of_insertion_result = diff --git a/cartographer_grpc/map_builder_server.h b/cartographer_grpc/map_builder_server.h index ce96d90..4a686dc 100644 --- a/cartographer_grpc/map_builder_server.h +++ b/cartographer_grpc/map_builder_server.h @@ -156,6 +156,7 @@ class MapBuilderServer { std::map local_slam_subscriptions_ GUARDED_BY(local_slam_subscriptions_lock_); std::unique_ptr local_trajectory_uploader_; + int starting_submap_index_ = 0; }; } // namespace cartographer_grpc diff --git a/cartographer_grpc/sensor/serialization.cc b/cartographer_grpc/sensor/serialization.cc index 67ffe2d..f3f473a 100644 --- a/cartographer_grpc/sensor/serialization.cc +++ b/cartographer_grpc/sensor/serialization.cc @@ -72,6 +72,28 @@ void CreateAddLandmarkDataRequest( *proto->mutable_landmark_data() = landmark_data; } +void CreateAddLocalSlamResultDataRequest( + const std::string& sensor_id, int trajectory_id, + cartographer::common::Time time, int starting_submap_index, + const cartographer::mapping::TrajectoryBuilderInterface::InsertionResult& + insertion_result, + proto::AddLocalSlamResultDataRequest* proto) { + sensor::CreateSensorMetadata(sensor_id, trajectory_id, + proto->mutable_sensor_metadata()); + proto->mutable_local_slam_result_data()->set_timestamp( + cartographer::common::ToUniversal(time)); + *proto->mutable_local_slam_result_data()->mutable_node_data() = + cartographer::mapping::ToProto(*insertion_result.constant_data); + for (const auto& insertion_submap : insertion_result.insertion_submaps) { + // We only send the probability grid up if the submap is finished. + auto* submap = proto->mutable_local_slam_result_data()->add_submaps(); + insertion_submap->ToProto(submap, insertion_submap->finished()); + submap->mutable_submap_id()->set_trajectory_id(trajectory_id); + submap->mutable_submap_id()->set_submap_index(starting_submap_index); + ++starting_submap_index; + } +} + proto::SensorId ToProto( const cartographer::mapping::TrajectoryBuilderInterface::SensorId& sensor_id) { diff --git a/cartographer_grpc/sensor/serialization.h b/cartographer_grpc/sensor/serialization.h index 8559ac4..a425a37 100644 --- a/cartographer_grpc/sensor/serialization.h +++ b/cartographer_grpc/sensor/serialization.h @@ -17,6 +17,7 @@ #ifndef CARTOGRAPHER_GRPC_SENSOR_SERIALIZATION_H #define CARTOGRAPHER_GRPC_SENSOR_SERIALIZATION_H +#include "cartographer/mapping/local_slam_result_data.h" #include "cartographer/mapping/trajectory_builder_interface.h" #include "cartographer/sensor/fixed_frame_pose_data.h" #include "cartographer/sensor/imu_data.h" @@ -53,11 +54,16 @@ void CreateAddLandmarkDataRequest( const std::string& sensor_id, int trajectory_id, const cartographer::sensor::proto::LandmarkData& landmark_data, proto::AddLandmarkDataRequest* proto); +void CreateAddLocalSlamResultDataRequest( + const std::string& sensor_id, int trajectory_id, + cartographer::common::Time time, int starting_submap_index, + const cartographer::mapping::TrajectoryBuilderInterface::InsertionResult& + insertion_result, + proto::AddLocalSlamResultDataRequest* proto); proto::SensorId ToProto( const cartographer::mapping::TrajectoryBuilderInterface::SensorId& sensor_id); - cartographer::mapping::TrajectoryBuilderInterface::SensorId FromProto( const proto::SensorId& proto);