From e3187513291d6e59f0241cfe9430019f657a7f34 Mon Sep 17 00:00:00 2001 From: gaschler Date: Wed, 5 Sep 2018 10:38:48 +0200 Subject: [PATCH] Graceful LocalTrajectoryUploader::AddTrajectory (#1409) FIXES=#1407 --- .../cloud/internal/client_server_test.cc | 35 ++++++++- .../internal/local_trajectory_uploader.cc | 74 ++++++++++--------- 2 files changed, 72 insertions(+), 37 deletions(-) diff --git a/cartographer/cloud/internal/client_server_test.cc b/cartographer/cloud/internal/client_server_test.cc index 33cbdd1..90c2dd7 100644 --- a/cartographer/cloud/internal/client_server_test.cc +++ b/cartographer/cloud/internal/client_server_test.cc @@ -213,7 +213,6 @@ class ClientServerTestBase : public T { void WaitForLocalSlamResultUploads(size_t size) { while (stub_->pose_graph()->GetTrajectoryNodePoses().size() < size) { - LOG(INFO) << stub_->pose_graph()->GetTrajectoryNodePoses().size(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -595,6 +594,40 @@ TEST_P(ClientServerTestByGridType, LocalSlam2DUplinkServerRestarting) { server_->WaitForShutdown(); } +TEST_F(ClientServerTest, DelayedConnectionToUplinkServer) { + InitializeRealUploadingServer(); + uploading_server_->Start(); + InitializeStubForUploadingServer(); + int trajectory_id = stub_for_uploading_server_->AddTrajectoryBuilder( + {kRangeSensorId}, trajectory_builder_options_, + local_slam_result_callback_); + TrajectoryBuilderInterface* trajectory_stub = + stub_for_uploading_server_->GetTrajectoryBuilder(trajectory_id); + const auto measurements = mapping::testing::GenerateFakeRangeMeasurements( + kTravelDistance, kDuration, kTimeStep); + + // Insert the first measurement. + trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(0)); + WaitForLocalSlamResults(1); + + LOG(INFO) << "Delayed start of uplink server."; + InitializeRealServer(); + server_->Start(); + InitializeStub(); + + // Insert all other measurements. + for (unsigned int i = 1; i < measurements.size(); ++i) { + trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(i)); + } + WaitForLocalSlamResults(measurements.size()); + WaitForLocalSlamResultUploads(2); + stub_for_uploading_server_->FinishTrajectory(trajectory_id); + uploading_server_->Shutdown(); + uploading_server_->WaitForShutdown(); + server_->Shutdown(); + server_->WaitForShutdown(); +} + TEST_P(ClientServerTestByGridType, LoadStateAndDelete) { if (GetParam() == ::cartographer::mapping::GridType::TSDF) { SetOptionsToTSDF2D(); diff --git a/cartographer/cloud/internal/local_trajectory_uploader.cc b/cartographer/cloud/internal/local_trajectory_uploader.cc index 20cbb1a..da7f85f 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.cc +++ b/cartographer/cloud/internal/local_trajectory_uploader.cc @@ -58,17 +58,8 @@ bool IsNewSubmap(const mapping::proto::Submap& submap) { class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { public: struct TrajectoryInfo { - TrajectoryInfo() = default; - TrajectoryInfo( - const int uplink_trajectory_id, - const std::set& expected_sensor_ids, - const mapping::proto::TrajectoryBuilderOptions& trajectory_options, - const std::string& client_id) - : uplink_trajectory_id(uplink_trajectory_id), - expected_sensor_ids(expected_sensor_ids), - trajectory_options(trajectory_options), - client_id(client_id) {} - const int uplink_trajectory_id; + // nullopt if uplink has not yet responded to AddTrajectoryRequest. + absl::optional uplink_trajectory_id; const std::set expected_sensor_ids; const mapping::proto::TrajectoryBuilderOptions trajectory_options; const std::string client_id; @@ -105,6 +96,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { void ProcessSendQueue(); // Returns 'false' for failure. bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); + grpc::Status RegisterTrajectory(int local_trajectory_id); std::shared_ptr<::grpc::Channel> client_channel_; int batch_size_; @@ -192,19 +184,11 @@ void LocalTrajectoryUploader::TryRecovery() { } // Attempt to recreate the trajectories. - const auto local_trajectory_id_to_trajectory_info = - local_trajectory_id_to_trajectory_info_; - local_trajectory_id_to_trajectory_info_.clear(); - for (const auto& entry : local_trajectory_id_to_trajectory_info) { - grpc::Status status = AddTrajectory(entry.second.client_id, entry.first, - entry.second.expected_sensor_ids, - entry.second.trajectory_options); + for (const auto& entry : local_trajectory_id_to_trajectory_info_) { + grpc::Status status = RegisterTrajectory(entry.first); if (!status.ok()) { LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. " << status.error_message(); - // Restore the previous state for the next recovery attempt. - local_trajectory_id_to_trajectory_info_ = - local_trajectory_id_to_trajectory_info; return; } } @@ -262,7 +246,11 @@ bool LocalTrajectoryUploader::TranslateTrajectoryId( if (it == local_trajectory_id_to_trajectory_info_.end()) { return false; } - int cloud_trajectory_id = it->second.uplink_trajectory_id; + if (!it->second.uplink_trajectory_id.has_value()) { + // Could not yet register trajectory with uplink server. + return false; + } + int cloud_trajectory_id = it->second.uplink_trajectory_id.value(); sensor_metadata->set_trajectory_id(cloud_trajectory_id); return true; } @@ -271,10 +259,23 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { + CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), + 0); + local_trajectory_id_to_trajectory_info_.emplace( + local_trajectory_id, + TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id}); + return RegisterTrajectory(local_trajectory_id); +} + +grpc::Status LocalTrajectoryUploader::RegisterTrajectory( + int local_trajectory_id) { + TrajectoryInfo& trajectory_info = + local_trajectory_id_to_trajectory_info_.at(local_trajectory_id); proto::AddTrajectoryRequest request; - request.set_client_id(client_id); - *request.mutable_trajectory_builder_options() = trajectory_options; - for (const SensorId& sensor_id : expected_sensor_ids) { + request.set_client_id(trajectory_info.client_id); + *request.mutable_trajectory_builder_options() = + trajectory_info.trajectory_options; + for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) { // Range sensors are not forwarded, but combined into a LocalSlamResult. if (sensor_id.type != SensorId::SensorType::RANGE) { *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); @@ -286,19 +287,15 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory( client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); ::grpc::Status status; if (!client.Write(request, &status)) { - LOG(ERROR) << status.error_message(); + LOG(ERROR) << "Failed to register local_trajectory_id " + << local_trajectory_id << " with uplink server. " + << status.error_message(); return status; } - LOG(INFO) << "Created trajectory for client_id: " << client_id + LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id << " local trajectory_id: " << local_trajectory_id << " uplink trajectory_id: " << client.response().trajectory_id(); - CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), - 0); - local_trajectory_id_to_trajectory_info_.emplace( - std::piecewise_construct, std::forward_as_tuple(local_trajectory_id), - std::forward_as_tuple(client.response().trajectory_id(), - expected_sensor_ids, trajectory_options, - client_id)); + trajectory_info.uplink_trajectory_id = client.response().trajectory_id(); return status; } @@ -310,10 +307,15 @@ grpc::Status LocalTrajectoryUploader::FinishTrajectory( "local_trajectory_id has not been" " registered with AddTrajectory."); } - int cloud_trajectory_id = it->second.uplink_trajectory_id; + auto cloud_trajectory_id = it->second.uplink_trajectory_id; + if (!cloud_trajectory_id.has_value()) { + return grpc::Status( + grpc::StatusCode::UNAVAILABLE, + "trajectory_id has not been created in uplink, ignoring."); + } proto::FinishTrajectoryRequest request; request.set_client_id(client_id); - request.set_trajectory_id(cloud_trajectory_id); + request.set_trajectory_id(cloud_trajectory_id.value()); async_grpc::Client client( client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); grpc::Status status;