Graceful LocalTrajectoryUploader::AddTrajectory (#1409)

FIXES=#1407
master
gaschler 2018-09-05 10:38:48 +02:00 committed by Wally B. Feed
parent 3b511aa1ba
commit e318751329
2 changed files with 72 additions and 37 deletions

View File

@ -213,7 +213,6 @@ class ClientServerTestBase : public T {
void WaitForLocalSlamResultUploads(size_t size) { void WaitForLocalSlamResultUploads(size_t size) {
while (stub_->pose_graph()->GetTrajectoryNodePoses().size() < size) { while (stub_->pose_graph()->GetTrajectoryNodePoses().size() < size) {
LOG(INFO) << stub_->pose_graph()->GetTrajectoryNodePoses().size();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
} }
@ -595,6 +594,40 @@ TEST_P(ClientServerTestByGridType, LocalSlam2DUplinkServerRestarting) {
server_->WaitForShutdown(); server_->WaitForShutdown();
} }
TEST_F(ClientServerTest, DelayedConnectionToUplinkServer) {
InitializeRealUploadingServer();
uploading_server_->Start();
InitializeStubForUploadingServer();
int trajectory_id = stub_for_uploading_server_->AddTrajectoryBuilder(
{kRangeSensorId}, trajectory_builder_options_,
local_slam_result_callback_);
TrajectoryBuilderInterface* trajectory_stub =
stub_for_uploading_server_->GetTrajectoryBuilder(trajectory_id);
const auto measurements = mapping::testing::GenerateFakeRangeMeasurements(
kTravelDistance, kDuration, kTimeStep);
// Insert the first measurement.
trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(0));
WaitForLocalSlamResults(1);
LOG(INFO) << "Delayed start of uplink server.";
InitializeRealServer();
server_->Start();
InitializeStub();
// Insert all other measurements.
for (unsigned int i = 1; i < measurements.size(); ++i) {
trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(i));
}
WaitForLocalSlamResults(measurements.size());
WaitForLocalSlamResultUploads(2);
stub_for_uploading_server_->FinishTrajectory(trajectory_id);
uploading_server_->Shutdown();
uploading_server_->WaitForShutdown();
server_->Shutdown();
server_->WaitForShutdown();
}
TEST_P(ClientServerTestByGridType, LoadStateAndDelete) { TEST_P(ClientServerTestByGridType, LoadStateAndDelete) {
if (GetParam() == ::cartographer::mapping::GridType::TSDF) { if (GetParam() == ::cartographer::mapping::GridType::TSDF) {
SetOptionsToTSDF2D(); SetOptionsToTSDF2D();

View File

@ -58,17 +58,8 @@ bool IsNewSubmap(const mapping::proto::Submap& submap) {
class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
public: public:
struct TrajectoryInfo { struct TrajectoryInfo {
TrajectoryInfo() = default; // nullopt if uplink has not yet responded to AddTrajectoryRequest.
TrajectoryInfo( absl::optional<int> uplink_trajectory_id;
const int uplink_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options,
const std::string& client_id)
: uplink_trajectory_id(uplink_trajectory_id),
expected_sensor_ids(expected_sensor_ids),
trajectory_options(trajectory_options),
client_id(client_id) {}
const int uplink_trajectory_id;
const std::set<SensorId> expected_sensor_ids; const std::set<SensorId> expected_sensor_ids;
const mapping::proto::TrajectoryBuilderOptions trajectory_options; const mapping::proto::TrajectoryBuilderOptions trajectory_options;
const std::string client_id; const std::string client_id;
@ -105,6 +96,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
void ProcessSendQueue(); void ProcessSendQueue();
// Returns 'false' for failure. // Returns 'false' for failure.
bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
grpc::Status RegisterTrajectory(int local_trajectory_id);
std::shared_ptr<::grpc::Channel> client_channel_; std::shared_ptr<::grpc::Channel> client_channel_;
int batch_size_; int batch_size_;
@ -192,19 +184,11 @@ void LocalTrajectoryUploader::TryRecovery() {
} }
// Attempt to recreate the trajectories. // Attempt to recreate the trajectories.
const auto local_trajectory_id_to_trajectory_info = for (const auto& entry : local_trajectory_id_to_trajectory_info_) {
local_trajectory_id_to_trajectory_info_; grpc::Status status = RegisterTrajectory(entry.first);
local_trajectory_id_to_trajectory_info_.clear();
for (const auto& entry : local_trajectory_id_to_trajectory_info) {
grpc::Status status = AddTrajectory(entry.second.client_id, entry.first,
entry.second.expected_sensor_ids,
entry.second.trajectory_options);
if (!status.ok()) { if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. " LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
<< status.error_message(); << status.error_message();
// Restore the previous state for the next recovery attempt.
local_trajectory_id_to_trajectory_info_ =
local_trajectory_id_to_trajectory_info;
return; return;
} }
} }
@ -262,7 +246,11 @@ bool LocalTrajectoryUploader::TranslateTrajectoryId(
if (it == local_trajectory_id_to_trajectory_info_.end()) { if (it == local_trajectory_id_to_trajectory_info_.end()) {
return false; return false;
} }
int cloud_trajectory_id = it->second.uplink_trajectory_id; if (!it->second.uplink_trajectory_id.has_value()) {
// Could not yet register trajectory with uplink server.
return false;
}
int cloud_trajectory_id = it->second.uplink_trajectory_id.value();
sensor_metadata->set_trajectory_id(cloud_trajectory_id); sensor_metadata->set_trajectory_id(cloud_trajectory_id);
return true; return true;
} }
@ -271,10 +259,23 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory(
const std::string& client_id, int local_trajectory_id, const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids, const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
0);
local_trajectory_id_to_trajectory_info_.emplace(
local_trajectory_id,
TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id});
return RegisterTrajectory(local_trajectory_id);
}
grpc::Status LocalTrajectoryUploader::RegisterTrajectory(
int local_trajectory_id) {
TrajectoryInfo& trajectory_info =
local_trajectory_id_to_trajectory_info_.at(local_trajectory_id);
proto::AddTrajectoryRequest request; proto::AddTrajectoryRequest request;
request.set_client_id(client_id); request.set_client_id(trajectory_info.client_id);
*request.mutable_trajectory_builder_options() = trajectory_options; *request.mutable_trajectory_builder_options() =
for (const SensorId& sensor_id : expected_sensor_ids) { trajectory_info.trajectory_options;
for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) {
// Range sensors are not forwarded, but combined into a LocalSlamResult. // Range sensors are not forwarded, but combined into a LocalSlamResult.
if (sensor_id.type != SensorId::SensorType::RANGE) { if (sensor_id.type != SensorId::SensorType::RANGE) {
*request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
@ -286,19 +287,15 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
::grpc::Status status; ::grpc::Status status;
if (!client.Write(request, &status)) { if (!client.Write(request, &status)) {
LOG(ERROR) << status.error_message(); LOG(ERROR) << "Failed to register local_trajectory_id "
<< local_trajectory_id << " with uplink server. "
<< status.error_message();
return status; return status;
} }
LOG(INFO) << "Created trajectory for client_id: " << client_id LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id
<< " local trajectory_id: " << local_trajectory_id << " local trajectory_id: " << local_trajectory_id
<< " uplink trajectory_id: " << client.response().trajectory_id(); << " uplink trajectory_id: " << client.response().trajectory_id();
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), trajectory_info.uplink_trajectory_id = client.response().trajectory_id();
0);
local_trajectory_id_to_trajectory_info_.emplace(
std::piecewise_construct, std::forward_as_tuple(local_trajectory_id),
std::forward_as_tuple(client.response().trajectory_id(),
expected_sensor_ids, trajectory_options,
client_id));
return status; return status;
} }
@ -310,10 +307,15 @@ grpc::Status LocalTrajectoryUploader::FinishTrajectory(
"local_trajectory_id has not been" "local_trajectory_id has not been"
" registered with AddTrajectory."); " registered with AddTrajectory.");
} }
int cloud_trajectory_id = it->second.uplink_trajectory_id; auto cloud_trajectory_id = it->second.uplink_trajectory_id;
if (!cloud_trajectory_id.has_value()) {
return grpc::Status(
grpc::StatusCode::UNAVAILABLE,
"trajectory_id has not been created in uplink, ignoring.");
}
proto::FinishTrajectoryRequest request; proto::FinishTrajectoryRequest request;
request.set_client_id(client_id); request.set_client_id(client_id);
request.set_trajectory_id(cloud_trajectory_id); request.set_trajectory_id(cloud_trajectory_id.value());
async_grpc::Client<handlers::FinishTrajectorySignature> client( async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
grpc::Status status; grpc::Status status;