diff --git a/cartographer/mapping/map_builder_test.cc b/cartographer/mapping/map_builder_test.cc index 4ecf6f2..0b0a754 100644 --- a/cartographer/mapping/map_builder_test.cc +++ b/cartographer/mapping/map_builder_test.cc @@ -121,7 +121,7 @@ TEST_F(MapBuilderTest, TrajectoryAddFinish2D) { const std::unordered_set expected_sensor_ids = {kRangeSensorId}; int trajectory_id = map_builder_->AddTrajectoryBuilder( expected_sensor_ids, trajectory_builder_options_, - nullptr /* local_slam_result_callback */); + nullptr /* GetLocalSlamResultCallbackForSubscriptions */); EXPECT_EQ(1, map_builder_->num_trajectory_builders()); EXPECT_TRUE(map_builder_->GetTrajectoryBuilder(trajectory_id) != nullptr); EXPECT_TRUE(map_builder_->pose_graph() != nullptr); @@ -136,7 +136,7 @@ TEST_F(MapBuilderTest, TrajectoryAddFinish3D) { const std::unordered_set expected_sensor_ids = {kRangeSensorId}; int trajectory_id = map_builder_->AddTrajectoryBuilder( expected_sensor_ids, trajectory_builder_options_, - nullptr /* local_slam_result_callback */); + nullptr /* GetLocalSlamResultCallbackForSubscriptions */); EXPECT_EQ(1, map_builder_->num_trajectory_builders()); EXPECT_TRUE(map_builder_->GetTrajectoryBuilder(trajectory_id) != nullptr); EXPECT_TRUE(map_builder_->pose_graph() != nullptr); diff --git a/cartographer_grpc/handlers/add_trajectory_handler.h b/cartographer_grpc/handlers/add_trajectory_handler.h index fb25fd4..f90a388 100644 --- a/cartographer_grpc/handlers/add_trajectory_handler.h +++ b/cartographer_grpc/handlers/add_trajectory_handler.h @@ -30,6 +30,9 @@ class AddTrajectoryHandler proto::AddTrajectoryResponse> { public: void OnRequest(const proto::AddTrajectoryRequest& request) override { + auto local_slam_result_callback = + GetUnsynchronizedContext() + ->GetLocalSlamResultCallbackForSubscriptions(); const int trajectory_id = GetContext() ->map_builder() @@ -37,7 +40,7 @@ class AddTrajectoryHandler request.expected_sensor_ids().begin(), request.expected_sensor_ids().end()), request.trajectory_builder_options(), - nullptr /* local_slam_result_callback */); + local_slam_result_callback); auto response = cartographer::common::make_unique(); response->set_trajectory_id(trajectory_id); diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index 73a8c28..d30c962 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -34,26 +34,40 @@ const cartographer::common::Duration kPopTimeout = } // namespace MapBuilderServer::MapBuilderContext::MapBuilderContext( - cartographer::mapping::MapBuilder* map_builder, - cartographer::common::BlockingQueue>* - sensor_data_queue) - : map_builder_(map_builder), sensor_data_queue_(sensor_data_queue) {} + MapBuilderServer* map_builder_server) + : map_builder_server_(map_builder_server) {} cartographer::mapping::MapBuilder& MapBuilderServer::MapBuilderContext::map_builder() { - return *map_builder_; + return map_builder_server_->map_builder_; } cartographer::common::BlockingQueue< std::unique_ptr>& MapBuilderServer::MapBuilderContext::sensor_data_queue() { - return *sensor_data_queue_; + return map_builder_server_->sensor_data_queue_; +} + +cartographer::mapping::TrajectoryBuilderInterface::LocalSlamResultCallback +MapBuilderServer::MapBuilderContext:: + GetLocalSlamResultCallbackForSubscriptions() { + MapBuilderServer* map_builder_server = map_builder_server_; + return [map_builder_server]( + int trajectory_id, cartographer::common::Time time, + cartographer::transform::Rigid3d local_pose, + cartographer::sensor::RangeData range_data, + std::unique_ptr node_id) { + map_builder_server->OnLocalSlamResult(trajectory_id, time, local_pose, + std::move(range_data), + std::move(node_id)); + }; } void MapBuilderServer::MapBuilderContext::AddSensorDataToTrajectory( const SensorData& sensor_data) { sensor_data.sensor_data->AddToTrajectoryBuilder( - map_builder_->GetTrajectoryBuilder(sensor_data.trajectory_id)); + map_builder_server_->map_builder_.GetTrajectoryBuilder( + sensor_data.trajectory_id)); } MapBuilderServer::MapBuilderServer( @@ -82,8 +96,7 @@ MapBuilderServer::MapBuilderServer( proto::MapBuilderService>("FinishTrajectory"); grpc_server_ = server_builder.Build(); grpc_server_->SetExecutionContext( - cartographer::common::make_unique( - &map_builder_, &sensor_data_queue_)); + cartographer::common::make_unique(this)); } void MapBuilderServer::Start() { @@ -127,4 +140,39 @@ void MapBuilderServer::StartSlamThread() { [this]() { this->ProcessSensorDataQueue(); }); } +void MapBuilderServer::OnLocalSlamResult( + int trajectory_id, cartographer::common::Time time, + cartographer::transform::Rigid3d local_pose, + cartographer::sensor::RangeData range_data, + std::unique_ptr node_id) { + auto shared_range_data = + std::make_shared(std::move(range_data)); + cartographer::common::MutexLocker locker(&local_slam_subscriptions_lock_); + for (auto& entry : local_slam_subscriptions_[trajectory_id]) { + auto copy_of_node_id = + node_id ? cartographer::common::make_unique< + const cartographer::mapping::NodeId>(*node_id) + : nullptr; + LocalSlamSubscriptionCallback callback = entry.second; + callback(trajectory_id, time, local_pose, shared_range_data, + std::move(copy_of_node_id)); + } +} + +MapBuilderServer::SubscriptionId MapBuilderServer::SubscribeLocalSlamResults( + int trajectory_id, LocalSlamSubscriptionCallback callback) { + cartographer::common::MutexLocker locker(&local_slam_subscriptions_lock_); + local_slam_subscriptions_[trajectory_id].emplace(current_subscription_index_, + callback); + return SubscriptionId{trajectory_id, current_subscription_index_++}; +} + +void MapBuilderServer::UnsubscribeLocalSlamResults( + const SubscriptionId& subscription_id) { + cartographer::common::MutexLocker locker(&local_slam_subscriptions_lock_); + CHECK_EQ(local_slam_subscriptions_[subscription_id.trajectory_id].erase( + subscription_id.subscription_index), + 1u); +} + } // namespace cartographer_grpc diff --git a/cartographer_grpc/map_builder_server.h b/cartographer_grpc/map_builder_server.h index 573a3f3..2896525 100644 --- a/cartographer_grpc/map_builder_server.h +++ b/cartographer_grpc/map_builder_server.h @@ -18,7 +18,9 @@ #define CARTOGRAPHER_GRPC_MAP_BUILDER_SERVER_H #include "cartographer/common/blocking_queue.h" +#include "cartographer/common/time.h" #include "cartographer/mapping/map_builder.h" +#include "cartographer/mapping/trajectory_builder_interface.h" #include "cartographer/sensor/data.h" #include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/server.h" @@ -28,26 +30,38 @@ namespace cartographer_grpc { class MapBuilderServer { public: + using LocalSlamSubscriptionCallback = std::function /* in local frame */, + std::unique_ptr)>; struct SensorData { int trajectory_id; std::unique_ptr sensor_data; }; + struct SubscriptionId { + const int trajectory_id; + const int subscription_index; + }; class MapBuilderContext : public framework::ExecutionContext { public: - MapBuilderContext( - cartographer::mapping::MapBuilder* map_builder, - cartographer::common::BlockingQueue>* - sensor_data_queue); + MapBuilderContext(MapBuilderServer* map_builder_server); cartographer::mapping::MapBuilder& map_builder(); cartographer::common::BlockingQueue>& sensor_data_queue(); + cartographer::mapping::TrajectoryBuilderInterface::LocalSlamResultCallback + GetLocalSlamResultCallbackForSubscriptions(); void AddSensorDataToTrajectory(const SensorData& sensor_data); + SubscriptionId SubscribeLocalSlamResults( + int trajectory_id, LocalSlamSubscriptionCallback callback); + void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id); template void EnqueueSensorData(int trajectory_id, const std::string& sensor_id, const SensorDataType& sensor_data) { - sensor_data_queue_->Push( + map_builder_server_->sensor_data_queue_.Push( cartographer::common::make_unique( MapBuilderServer::SensorData{ trajectory_id, cartographer::sensor::MakeDispatchable( @@ -55,10 +69,9 @@ class MapBuilderServer { } private: - cartographer::mapping::MapBuilder* map_builder_; - cartographer::common::BlockingQueue>* - sensor_data_queue_; + MapBuilderServer* map_builder_server_; }; + friend MapBuilderContext; MapBuilderServer( const proto::MapBuilderServerOptions& map_builder_server_options); @@ -75,8 +88,19 @@ class MapBuilderServer { void Shutdown(); private: + using LocalSlamResultHandlerSubscriptions = + std::map; + void ProcessSensorDataQueue(); void StartSlamThread(); + void OnLocalSlamResult( + int trajectory_id, cartographer::common::Time time, + cartographer::transform::Rigid3d local_pose, + cartographer::sensor::RangeData range_data, + std::unique_ptr node_id); + SubscriptionId SubscribeLocalSlamResults( + int trajectory_id, LocalSlamSubscriptionCallback callback); + void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id); bool shutting_down_ = false; std::unique_ptr slam_thread_; @@ -84,6 +108,10 @@ class MapBuilderServer { cartographer::mapping::MapBuilder map_builder_; cartographer::common::BlockingQueue> sensor_data_queue_; + cartographer::common::Mutex local_slam_subscriptions_lock_; + int current_subscription_index_ = 0; + std::map + local_slam_subscriptions_ GUARDED_BY(local_slam_subscriptions_lock_); }; } // namespace cartographer_grpc