Introduce client_id in cartographer grpc (#1241)
parent
120c216c47
commit
35abfccc6b
|
@ -39,11 +39,14 @@ constexpr int kConnectionTimeoutInSeconds = 10;
|
|||
|
||||
} // namespace
|
||||
|
||||
MapBuilderStub::MapBuilderStub(const std::string& server_address)
|
||||
MapBuilderStub::MapBuilderStub(const std::string& server_address,
|
||||
const std::string& client_id)
|
||||
: client_channel_(::grpc::CreateChannel(
|
||||
server_address, ::grpc::InsecureChannelCredentials())),
|
||||
pose_graph_stub_(make_unique<PoseGraphStub>(client_channel_)) {
|
||||
LOG(INFO) << "Connecting to SLAM process at " << server_address;
|
||||
pose_graph_stub_(make_unique<PoseGraphStub>(client_channel_, client_id)),
|
||||
client_id_(client_id) {
|
||||
LOG(INFO) << "Connecting to SLAM process at " << server_address
|
||||
<< " with client_id " << client_id;
|
||||
std::chrono::system_clock::time_point deadline(
|
||||
std::chrono::system_clock::now() +
|
||||
std::chrono::seconds(kConnectionTimeoutInSeconds));
|
||||
|
@ -57,6 +60,7 @@ int MapBuilderStub::AddTrajectoryBuilder(
|
|||
const mapping::proto::TrajectoryBuilderOptions& trajectory_options,
|
||||
LocalSlamResultCallback local_slam_result_callback) {
|
||||
proto::AddTrajectoryRequest request;
|
||||
request.set_client_id(client_id_);
|
||||
*request.mutable_trajectory_builder_options() = trajectory_options;
|
||||
for (const auto& sensor_id : expected_sensor_ids) {
|
||||
*request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
|
||||
|
@ -93,10 +97,18 @@ mapping::TrajectoryBuilderInterface* MapBuilderStub::GetTrajectoryBuilder(
|
|||
|
||||
void MapBuilderStub::FinishTrajectory(int trajectory_id) {
|
||||
proto::FinishTrajectoryRequest request;
|
||||
request.set_client_id(client_id_);
|
||||
request.set_trajectory_id(trajectory_id);
|
||||
async_grpc::Client<handlers::FinishTrajectorySignature> client(
|
||||
client_channel_);
|
||||
CHECK(client.Write(request));
|
||||
::grpc::Status status;
|
||||
client.Write(request, &status);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Failed to finish trajectory " << trajectory_id
|
||||
<< " for client_id " << client_id_ << ": "
|
||||
<< status.error_message();
|
||||
return;
|
||||
}
|
||||
trajectory_builder_stubs_.erase(trajectory_id);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,8 @@ namespace cloud {
|
|||
|
||||
class MapBuilderStub : public mapping::MapBuilderInterface {
|
||||
public:
|
||||
MapBuilderStub(const std::string& server_address);
|
||||
MapBuilderStub(const std::string& server_address,
|
||||
const std::string& client_id);
|
||||
|
||||
MapBuilderStub(const MapBuilderStub&) = delete;
|
||||
MapBuilderStub& operator=(const MapBuilderStub&) = delete;
|
||||
|
@ -61,6 +62,7 @@ class MapBuilderStub : public mapping::MapBuilderInterface {
|
|||
std::unique_ptr<mapping::PoseGraphInterface> pose_graph_stub_;
|
||||
std::map<int, std::unique_ptr<mapping::TrajectoryBuilderInterface>>
|
||||
trajectory_builder_stubs_;
|
||||
const std::string client_id_;
|
||||
};
|
||||
|
||||
} // namespace cloud
|
||||
|
|
|
@ -35,8 +35,9 @@
|
|||
namespace cartographer {
|
||||
namespace cloud {
|
||||
|
||||
PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel)
|
||||
: client_channel_(client_channel) {}
|
||||
PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel,
|
||||
const std::string& client_id)
|
||||
: client_channel_(client_channel), client_id_(client_id) {}
|
||||
|
||||
void PoseGraphStub::RunFinalOptimization() {
|
||||
google::protobuf::Empty request;
|
||||
|
@ -149,10 +150,17 @@ void PoseGraphStub::SetLandmarkPose(const std::string& landmark_id,
|
|||
|
||||
void PoseGraphStub::DeleteTrajectory(int trajectory_id) {
|
||||
proto::DeleteTrajectoryRequest request;
|
||||
request.set_client_id(client_id_);
|
||||
request.set_trajectory_id(trajectory_id);
|
||||
async_grpc::Client<handlers::DeleteTrajectorySignature> client(
|
||||
client_channel_);
|
||||
CHECK(client.Write(request));
|
||||
::grpc::Status status;
|
||||
client.Write(request, &status);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Failed to delete trajectory " << trajectory_id
|
||||
<< " for client_id " << client_id_ << ": "
|
||||
<< status.error_message();
|
||||
}
|
||||
}
|
||||
|
||||
bool PoseGraphStub::IsTrajectoryFinished(int trajectory_id) const {
|
||||
|
|
|
@ -25,7 +25,8 @@ namespace cloud {
|
|||
|
||||
class PoseGraphStub : public ::cartographer::mapping::PoseGraphInterface {
|
||||
public:
|
||||
PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel);
|
||||
PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel,
|
||||
const std::string& client_id);
|
||||
|
||||
PoseGraphStub(const PoseGraphStub&) = delete;
|
||||
PoseGraphStub& operator=(const PoseGraphStub&) = delete;
|
||||
|
@ -57,6 +58,7 @@ class PoseGraphStub : public ::cartographer::mapping::PoseGraphInterface {
|
|||
|
||||
private:
|
||||
std::shared_ptr<::grpc::Channel> client_channel_;
|
||||
const std::string client_id_;
|
||||
};
|
||||
|
||||
} // namespace cloud
|
||||
|
|
|
@ -45,6 +45,7 @@ namespace cartographer {
|
|||
namespace cloud {
|
||||
namespace {
|
||||
|
||||
constexpr char kClientId[] = "CLIENT_ID";
|
||||
const SensorId kImuSensorId{SensorId::SensorType::IMU, "imu"};
|
||||
const SensorId kRangeSensorId{SensorId::SensorType::RANGE, "range"};
|
||||
constexpr double kDuration = 4.; // Seconds.
|
||||
|
@ -166,13 +167,13 @@ class ClientServerTest : public ::testing::Test {
|
|||
|
||||
void InitializeStub() {
|
||||
stub_ = common::make_unique<MapBuilderStub>(
|
||||
map_builder_server_options_.server_address());
|
||||
map_builder_server_options_.server_address(), kClientId);
|
||||
EXPECT_TRUE(stub_ != nullptr);
|
||||
}
|
||||
|
||||
void InitializeStubForUploadingServer() {
|
||||
stub_for_uploading_server_ = common::make_unique<MapBuilderStub>(
|
||||
uploading_map_builder_server_options_.server_address());
|
||||
uploading_map_builder_server_options_.server_address(), kClientId);
|
||||
EXPECT_TRUE(stub_for_uploading_server_ != nullptr);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,4 +40,4 @@ class AddSensorDataBatchHandler
|
|||
} // namespace cloud
|
||||
} // namespace cartographer
|
||||
|
||||
#endif // CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_SENSOR_DATA_BATCH_HANDLER_H
|
||||
#endif // CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_SENSOR_DATA_BATCH_HANDLER_H
|
||||
|
|
|
@ -41,6 +41,8 @@ void AddTrajectoryHandler::OnRequest(
|
|||
.AddTrajectoryBuilder(expected_sensor_ids,
|
||||
request.trajectory_builder_options(),
|
||||
local_slam_result_callback);
|
||||
GetContext<MapBuilderContextInterface>()->RegisterClientIdForTrajectory(
|
||||
request.client_id(), trajectory_id);
|
||||
if (GetUnsynchronizedContext<MapBuilderContextInterface>()
|
||||
->local_trajectory_uploader()) {
|
||||
auto trajectory_builder_options = request.trajectory_builder_options();
|
||||
|
@ -60,7 +62,7 @@ void AddTrajectoryHandler::OnRequest(
|
|||
|
||||
GetContext<MapBuilderContextInterface>()
|
||||
->local_trajectory_uploader()
|
||||
->AddTrajectory(trajectory_id, expected_sensor_ids,
|
||||
->AddTrajectory(request.client_id(), trajectory_id, expected_sensor_ids,
|
||||
trajectory_builder_options);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ using ::testing::Test;
|
|||
using ::testing::Truly;
|
||||
|
||||
const std::string kMessage = R"(
|
||||
client_id: "CLIENT_ID"
|
||||
expected_sensor_ids {
|
||||
id: "range_sensor"
|
||||
type: RANGE
|
||||
|
@ -105,6 +106,8 @@ TEST_F(AddTrajectoryHandlerTest, NoLocalSlamUploader) {
|
|||
&request.trajectory_builder_options())),
|
||||
_))
|
||||
.WillOnce(Return(13));
|
||||
EXPECT_CALL(*mock_map_builder_context_,
|
||||
RegisterClientIdForTrajectory(Eq("CLIENT_ID"), Eq(13)));
|
||||
test_server_->SendWrite(request);
|
||||
EXPECT_EQ(test_server_->response().trajectory_id(), 13);
|
||||
}
|
||||
|
@ -120,6 +123,8 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) {
|
|||
&request.trajectory_builder_options())),
|
||||
_))
|
||||
.WillOnce(Return(13));
|
||||
EXPECT_CALL(*mock_map_builder_context_,
|
||||
RegisterClientIdForTrajectory(Eq("CLIENT_ID"), Eq(13)));
|
||||
auto upstream_trajectory_builder_options =
|
||||
request.trajectory_builder_options();
|
||||
// Reproduce the changes to the request as done by the handler.
|
||||
|
@ -128,7 +133,7 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) {
|
|||
upstream_trajectory_builder_options.clear_pure_localization_trimmer();
|
||||
upstream_trajectory_builder_options.clear_initial_trajectory_pose();
|
||||
EXPECT_CALL(*mock_local_trajectory_uploader_,
|
||||
AddTrajectory(Eq(13), ParseSensorIds(request),
|
||||
AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request),
|
||||
Truly(testing::BuildProtoPredicateEquals(
|
||||
&upstream_trajectory_builder_options))));
|
||||
test_server_->SendWrite(request);
|
||||
|
|
|
@ -28,6 +28,13 @@ namespace handlers {
|
|||
|
||||
void DeleteTrajectoryHandler::OnRequest(
|
||||
const proto::DeleteTrajectoryRequest& request) {
|
||||
if (!GetContext<MapBuilderContextInterface>()->CheckClientIdForTrajectory(
|
||||
request.client_id(), request.trajectory_id())) {
|
||||
LOG(ERROR) << "Unknown trajectory with ID " << request.trajectory_id()
|
||||
<< " and client_id " << request.client_id();
|
||||
Finish(::grpc::Status(::grpc::NOT_FOUND, "Unknown trajectory"));
|
||||
return;
|
||||
}
|
||||
GetContext<MapBuilderContextInterface>()
|
||||
->map_builder()
|
||||
.pose_graph()
|
||||
|
|
|
@ -28,6 +28,13 @@ namespace handlers {
|
|||
|
||||
void FinishTrajectoryHandler::OnRequest(
|
||||
const proto::FinishTrajectoryRequest& request) {
|
||||
if (!GetContext<MapBuilderContextInterface>()->CheckClientIdForTrajectory(
|
||||
request.client_id(), request.trajectory_id())) {
|
||||
LOG(ERROR) << "Unknown trajectory with ID " << request.trajectory_id()
|
||||
<< " and client_id " << request.client_id();
|
||||
Finish(::grpc::Status(::grpc::NOT_FOUND, "Unknown trajectory"));
|
||||
return;
|
||||
}
|
||||
GetContext<MapBuilderContextInterface>()->map_builder().FinishTrajectory(
|
||||
request.trajectory_id());
|
||||
GetUnsynchronizedContext<MapBuilderContextInterface>()
|
||||
|
@ -36,7 +43,7 @@ void FinishTrajectoryHandler::OnRequest(
|
|||
->local_trajectory_uploader()) {
|
||||
GetContext<MapBuilderContextInterface>()
|
||||
->local_trajectory_uploader()
|
||||
->FinishTrajectory(request.trajectory_id());
|
||||
->FinishTrajectory(request.client_id(), request.trajectory_id());
|
||||
}
|
||||
Send(common::make_unique<google::protobuf::Empty>());
|
||||
}
|
||||
|
|
|
@ -55,9 +55,11 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
|
|||
void Shutdown() final;
|
||||
|
||||
void AddTrajectory(
|
||||
int local_trajectory_id, const std::set<SensorId>& expected_sensor_ids,
|
||||
const std::string& client_id, int local_trajectory_id,
|
||||
const std::set<SensorId>& expected_sensor_ids,
|
||||
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
|
||||
void FinishTrajectory(int local_trajectory_id) final;
|
||||
void FinishTrajectory(const std::string& client_id,
|
||||
int local_trajectory_id) final;
|
||||
void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
|
||||
|
||||
SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final {
|
||||
|
@ -158,9 +160,11 @@ void LocalTrajectoryUploader::TranslateTrajectoryId(
|
|||
}
|
||||
|
||||
void LocalTrajectoryUploader::AddTrajectory(
|
||||
int local_trajectory_id, const std::set<SensorId>& expected_sensor_ids,
|
||||
const std::string& client_id, int local_trajectory_id,
|
||||
const std::set<SensorId>& expected_sensor_ids,
|
||||
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
|
||||
proto::AddTrajectoryRequest request;
|
||||
request.set_client_id(client_id);
|
||||
*request.mutable_trajectory_builder_options() = trajectory_options;
|
||||
for (const SensorId& sensor_id : expected_sensor_ids) {
|
||||
// Range sensors are not forwarded, but combined into a LocalSlamResult.
|
||||
|
@ -178,11 +182,13 @@ void LocalTrajectoryUploader::AddTrajectory(
|
|||
client.response().trajectory_id();
|
||||
}
|
||||
|
||||
void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) {
|
||||
void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id,
|
||||
int local_trajectory_id) {
|
||||
CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1);
|
||||
int cloud_trajectory_id =
|
||||
local_to_cloud_trajectory_id_map_[local_trajectory_id];
|
||||
proto::FinishTrajectoryRequest request;
|
||||
request.set_client_id(client_id);
|
||||
request.set_trajectory_id(cloud_trajectory_id);
|
||||
async_grpc::Client<handlers::FinishTrajectorySignature> client(
|
||||
client_channel_);
|
||||
|
|
|
@ -45,9 +45,11 @@ class LocalTrajectoryUploaderInterface {
|
|||
virtual void EnqueueSensorData(
|
||||
std::unique_ptr<proto::SensorData> sensor_data) = 0;
|
||||
virtual void AddTrajectory(
|
||||
int local_trajectory_id, const std::set<SensorId>& expected_sensor_ids,
|
||||
const std::string& client_id, int local_trajectory_id,
|
||||
const std::set<SensorId>& expected_sensor_ids,
|
||||
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0;
|
||||
virtual void FinishTrajectory(int local_trajectory_id) = 0;
|
||||
virtual void FinishTrajectory(const std::string& client_id,
|
||||
int local_trajectory_id) = 0;
|
||||
|
||||
virtual SensorId GetLocalSlamResultSensorId(
|
||||
int local_trajectory_id) const = 0;
|
||||
|
|
|
@ -104,6 +104,22 @@ void MapBuilderContext<SubmapType>::EnqueueSensorData(
|
|||
common::make_unique<Data>(Data{trajectory_id, std::move(data)}));
|
||||
}
|
||||
|
||||
template <class SubmapType>
|
||||
void MapBuilderContext<SubmapType>::RegisterClientIdForTrajectory(
|
||||
const std::string& client_id, int trajectory_id) {
|
||||
CHECK_EQ(client_ids_.count(trajectory_id), 0u);
|
||||
LOG(INFO) << "Registering trajectory_id " << trajectory_id << " to client_id "
|
||||
<< client_id;
|
||||
client_ids_[trajectory_id] = client_id;
|
||||
}
|
||||
|
||||
template <class SubmapType>
|
||||
bool MapBuilderContext<SubmapType>::CheckClientIdForTrajectory(
|
||||
const std::string& client_id, int trajectory_id) {
|
||||
return (client_ids_.count(trajectory_id) > 0 &&
|
||||
client_ids_[trajectory_id] == client_id);
|
||||
}
|
||||
|
||||
template <>
|
||||
void MapBuilderContext<mapping::Submap2D>::EnqueueLocalSlamResultData(
|
||||
int trajectory_id, const std::string& sensor_id,
|
||||
|
|
|
@ -92,6 +92,10 @@ class MapBuilderContextInterface : public async_grpc::ExecutionContext {
|
|||
virtual void EnqueueLocalSlamResultData(
|
||||
int trajectory_id, const std::string& sensor_id,
|
||||
const mapping::proto::LocalSlamResultData& local_slam_result_data) = 0;
|
||||
virtual void RegisterClientIdForTrajectory(const std::string& client_id,
|
||||
int trajectory_id) = 0;
|
||||
virtual bool CheckClientIdForTrajectory(const std::string& client_id,
|
||||
int trajectory_id) = 0;
|
||||
};
|
||||
|
||||
} // namespace cloud
|
||||
|
|
|
@ -64,10 +64,15 @@ class MapBuilderContext : public MapBuilderContextInterface {
|
|||
const std::string& sensor_id,
|
||||
const mapping::proto::LocalSlamResultData&
|
||||
local_slam_result_data) override;
|
||||
void RegisterClientIdForTrajectory(const std::string& client_id,
|
||||
int trajectory_id) override;
|
||||
bool CheckClientIdForTrajectory(const std::string& client_id,
|
||||
int trajectory_id) override;
|
||||
|
||||
private:
|
||||
MapBuilderServer* map_builder_server_;
|
||||
mapping::SubmapController<SubmapType> submap_controller_;
|
||||
std::map</*trajectory_id=*/int, /*client_id=*/std::string> client_ids_;
|
||||
};
|
||||
|
||||
class MapBuilderServer : public MapBuilderServerInterface {
|
||||
|
|
|
@ -35,10 +35,10 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
|
|||
}
|
||||
MOCK_METHOD0(Start, void());
|
||||
MOCK_METHOD0(Shutdown, void());
|
||||
MOCK_METHOD3(AddTrajectory,
|
||||
void(int, const std::set<SensorId> &,
|
||||
MOCK_METHOD4(AddTrajectory,
|
||||
void(const std::string &, int, const std::set<SensorId> &,
|
||||
const mapping::proto::TrajectoryBuilderOptions &));
|
||||
MOCK_METHOD1(FinishTrajectory, void(int));
|
||||
MOCK_METHOD2(FinishTrajectory, void(const std::string &, int));
|
||||
MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int));
|
||||
};
|
||||
|
||||
|
|
|
@ -59,6 +59,8 @@ class MockMapBuilderContext : public MapBuilderContextInterface {
|
|||
MOCK_METHOD3(EnqueueLocalSlamResultData,
|
||||
void(int, const std::string &,
|
||||
const mapping::proto::LocalSlamResultData &));
|
||||
MOCK_METHOD2(RegisterClientIdForTrajectory, void(const std::string &, int));
|
||||
MOCK_METHOD2(CheckClientIdForTrajectory, bool(const std::string &, int));
|
||||
};
|
||||
|
||||
} // namespace testing
|
||||
|
|
|
@ -49,6 +49,7 @@ message AddTrajectoryRequest {
|
|||
repeated SensorId expected_sensor_ids = 3;
|
||||
cartographer.mapping.proto.TrajectoryBuilderOptions
|
||||
trajectory_builder_options = 2;
|
||||
string client_id = 4;
|
||||
}
|
||||
|
||||
message SensorMetadata {
|
||||
|
@ -103,10 +104,12 @@ message AddLandmarkDataRequest {
|
|||
|
||||
message FinishTrajectoryRequest {
|
||||
int32 trajectory_id = 1;
|
||||
string client_id = 2;
|
||||
}
|
||||
|
||||
message DeleteTrajectoryRequest {
|
||||
int32 trajectory_id = 1;
|
||||
string client_id = 2;
|
||||
}
|
||||
|
||||
message ReceiveLocalSlamResultsRequest {
|
||||
|
|
Loading…
Reference in New Issue