Notify LocalSlamResults subscription ends. (#777)

[RFC=0002](https://github.com/googlecartographer/rfcs/blob/master/text/0002-cloud-based-mapping-1.md)
PAIR=cschuet
master
gaschler 2017-12-20 11:42:01 +01:00 committed by Wally B. Feed
parent 5b5b290e9f
commit f5e99089a9
6 changed files with 51 additions and 13 deletions

View File

@ -35,7 +35,28 @@ class RpcHandler : public RpcHandlerInterface {
using OutgoingType = Outgoing; using OutgoingType = Outgoing;
using RequestType = StripStream<Incoming>; using RequestType = StripStream<Incoming>;
using ResponseType = StripStream<Outgoing>; using ResponseType = StripStream<Outgoing>;
using Writer = std::function<bool(std::unique_ptr<ResponseType>)>;
class Writer {
public:
explicit Writer(std::weak_ptr<Rpc> rpc) : rpc_(std::move(rpc)) {}
bool Write(std::unique_ptr<ResponseType> message) const {
if (auto rpc = rpc_.lock()) {
rpc->Write(std::move(message));
return true;
}
return false;
}
bool WritesDone() const {
if (auto rpc = rpc_.lock()) {
rpc->Finish(::grpc::Status::OK);
return true;
}
return false;
}
private:
const std::weak_ptr<Rpc> rpc_;
};
void SetExecutionContext(ExecutionContext* execution_context) { void SetExecutionContext(ExecutionContext* execution_context) {
execution_context_ = execution_context; execution_context_ = execution_context;
@ -58,16 +79,7 @@ class RpcHandler : public RpcHandlerInterface {
T* GetUnsynchronizedContext() { T* GetUnsynchronizedContext() {
return dynamic_cast<T*>(execution_context_); return dynamic_cast<T*>(execution_context_);
} }
Writer GetWriter() { Writer GetWriter() { return Writer(rpc_->GetWeakPtr()); }
std::weak_ptr<Rpc> weak_ptr_rpc = rpc_->GetWeakPtr();
return [weak_ptr_rpc](std::unique_ptr<ResponseType> message) {
if (auto rpc = weak_ptr_rpc.lock()) {
rpc->Write(std::move(message));
return true;
}
return false;
};
}
private: private:
Rpc* rpc_; Rpc* rpc_;

View File

@ -96,7 +96,7 @@ class GetEchoHandler
auto response = auto response =
cartographer::common::make_unique<proto::GetEchoResponse>(); cartographer::common::make_unique<proto::GetEchoResponse>();
response->set_output(value); response->set_output(value);
return writer(std::move(response)); return writer.Write(std::move(response));
}); });
} }
}; };

View File

@ -34,6 +34,8 @@ class FinishTrajectoryHandler
GetContext<MapBuilderServer::MapBuilderContext>() GetContext<MapBuilderServer::MapBuilderContext>()
->map_builder() ->map_builder()
.FinishTrajectory(request.trajectory_id()); .FinishTrajectory(request.trajectory_id());
GetUnsynchronizedContext<MapBuilderServer::MapBuilderContext>()
->NotifyFinishTrajectory(request.trajectory_id());
Send(std::move( Send(std::move(
cartographer::common::make_unique<google::protobuf::Empty>())); cartographer::common::make_unique<google::protobuf::Empty>()));
} }

View File

@ -39,7 +39,14 @@ class ReceiveLocalSlamResultsHandler
request.trajectory_id(), request.trajectory_id(),
[writer](std::unique_ptr<MapBuilderServer::LocalSlamResult> [writer](std::unique_ptr<MapBuilderServer::LocalSlamResult>
local_slam_result) { local_slam_result) {
writer(GenerateResponse(std::move(local_slam_result))); if (local_slam_result) {
writer.Write(
GenerateResponse(std::move(local_slam_result)));
} else {
// Callback with 'nullptr' signals that the trajectory
// finished.
writer.WritesDone();
}
}); });
subscription_id_ = subscription_id_ =

View File

@ -83,6 +83,11 @@ void MapBuilderServer::MapBuilderContext::UnsubscribeLocalSlamResults(
map_builder_server_->UnsubscribeLocalSlamResults(subscription_id); map_builder_server_->UnsubscribeLocalSlamResults(subscription_id);
} }
void MapBuilderServer::MapBuilderContext::NotifyFinishTrajectory(
int trajectory_id) {
map_builder_server_->NotifyFinishTrajectory(trajectory_id);
}
MapBuilderServer::MapBuilderServer( MapBuilderServer::MapBuilderServer(
const proto::MapBuilderServerOptions& map_builder_server_options, const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<cartographer::mapping::MapBuilderInterface> map_builder) std::unique_ptr<cartographer::mapping::MapBuilderInterface> map_builder)
@ -193,4 +198,13 @@ void MapBuilderServer::UnsubscribeLocalSlamResults(
1u); 1u);
} }
void MapBuilderServer::NotifyFinishTrajectory(int trajectory_id) {
cartographer::common::MutexLocker locker(&local_slam_subscriptions_lock_);
for (auto& entry : local_slam_subscriptions_[trajectory_id]) {
LocalSlamSubscriptionCallback callback = entry.second;
// 'nullptr' signals subscribers that the trajectory finished.
callback(nullptr);
}
}
} // namespace cartographer_grpc } // namespace cartographer_grpc

View File

@ -37,6 +37,7 @@ class MapBuilderServer {
std::shared_ptr<const cartographer::sensor::RangeData> range_data; std::shared_ptr<const cartographer::sensor::RangeData> range_data;
std::unique_ptr<const cartographer::mapping::NodeId> node_id; std::unique_ptr<const cartographer::mapping::NodeId> node_id;
}; };
// Calling with 'nullptr' signals subscribers that the subscription has ended.
using LocalSlamSubscriptionCallback = using LocalSlamSubscriptionCallback =
std::function<void(std::unique_ptr<LocalSlamResult>)>; std::function<void(std::unique_ptr<LocalSlamResult>)>;
struct SensorData { struct SensorData {
@ -60,6 +61,7 @@ class MapBuilderServer {
SubscriptionId SubscribeLocalSlamResults( SubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback); int trajectory_id, LocalSlamSubscriptionCallback callback);
void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id); void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id);
void NotifyFinishTrajectory(int trajectory_id);
template <typename SensorDataType> template <typename SensorDataType>
void EnqueueSensorData(int trajectory_id, const std::string& sensor_id, void EnqueueSensorData(int trajectory_id, const std::string& sensor_id,
@ -105,6 +107,7 @@ class MapBuilderServer {
SubscriptionId SubscribeLocalSlamResults( SubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback); int trajectory_id, LocalSlamSubscriptionCallback callback);
void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id); void UnsubscribeLocalSlamResults(const SubscriptionId& subscription_id);
void NotifyFinishTrajectory(int trajectory_id);
bool shutting_down_ = false; bool shutting_down_ = false;
std::unique_ptr<std::thread> slam_thread_; std::unique_ptr<std::thread> slam_thread_;