Graceful LocalTrajectoryUploader (#1381)

* Graceful LocalTrajectoryUploader

FIXES=#1370
master
gaschler 2018-08-12 13:46:13 +02:00 committed by Christoph Schütte
parent fe59278286
commit 811f2e8e8f
7 changed files with 136 additions and 49 deletions

View File

@ -60,14 +60,14 @@ void AddTrajectoryHandler::OnRequest(
// Ignore initial poses in trajectory_builder_options.
trajectory_builder_options.clear_initial_trajectory_pose();
if (!GetContext<MapBuilderContextInterface>()
auto status =
GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->AddTrajectory(request.client_id(), trajectory_id,
expected_sensor_ids, trajectory_builder_options)) {
LOG(ERROR) << "Failed to create trajectory in uplink: " << trajectory_id;
Finish(::grpc::Status(::grpc::INTERNAL,
"Failed to create trajectory in uplink"));
return;
expected_sensor_ids, trajectory_builder_options);
if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory_id " << trajectory_id
<< " in uplink. Error: " << status.error_message();
}
}

View File

@ -136,7 +136,7 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) {
AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request),
Truly(testing::BuildProtoPredicateEquals(
&upstream_trajectory_builder_options))))
.WillOnce(Return(13));
.WillOnce(Return(grpc::Status::OK));
test_server_->SendWrite(request);
EXPECT_EQ(test_server_->response().trajectory_id(), 13);
}

View File

@ -41,9 +41,14 @@ void FinishTrajectoryHandler::OnRequest(
->NotifyFinishTrajectory(request.trajectory_id());
if (GetUnsynchronizedContext<MapBuilderContextInterface>()
->local_trajectory_uploader()) {
auto status =
GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->FinishTrajectory(request.client_id(), request.trajectory_id());
if (!status.ok()) {
LOG(ERROR) << "Failed to finish trajectory in uplink: "
<< status.error_message();
}
}
Send(absl::make_unique<google::protobuf::Empty>());
}

View File

@ -82,11 +82,11 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
// complete.
void Shutdown() final;
bool AddTrajectory(
grpc::Status AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
void FinishTrajectory(const std::string& client_id,
grpc::Status FinishTrajectory(const std::string& client_id,
int local_trajectory_id) final;
void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
void TryRecovery();
@ -98,7 +98,8 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
private:
void ProcessSendQueue();
void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
// Returns 'false' for failure.
bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
std::shared_ptr<::grpc::Channel> client_channel_;
int batch_size_;
@ -124,12 +125,12 @@ LocalTrajectoryUploader::LocalTrajectoryUploader(
grpc::CompositeChannelCredentials(channel_creds, call_creds);
}
client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds);
std::chrono::system_clock::time_point deadline(
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(kConnectionTimeoutInSeconds));
std::chrono::seconds(kConnectionTimeoutInSeconds);
LOG(INFO) << "Connecting to uplink " << uplink_server_address;
if (!client_channel_->WaitForConnected(deadline)) {
LOG(FATAL) << "Failed to connect to " << uplink_server_address;
LOG(ERROR) << "Failed to connect to " << uplink_server_address;
}
}
@ -151,7 +152,11 @@ void LocalTrajectoryUploader::Shutdown() {
void LocalTrajectoryUploader::TryRecovery() {
// Wind the sensor_data_queue forward to the next new submap.
LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
while (true) {
if (shutting_down_) {
return;
}
proto::SensorData* sensor_data =
send_queue_.PeekWithTimeout<proto::SensorData>(kPopTimeout);
if (sensor_data) {
@ -173,13 +178,16 @@ void LocalTrajectoryUploader::TryRecovery() {
local_trajectory_id_to_trajectory_info_;
local_trajectory_id_to_trajectory_info_.clear();
for (const auto& entry : local_trajectory_id_to_trajectory_info) {
if (!AddTrajectory(entry.second.client_id, entry.first,
grpc::Status status = AddTrajectory(entry.second.client_id, entry.first,
entry.second.expected_sensor_ids,
entry.second.trajectory_options)) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt.";
entry.second.trajectory_options);
if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
<< status.error_message();
return;
}
}
LOG(INFO) << "LocalTrajectoryUploader recovered.";
}
void LocalTrajectoryUploader::ProcessSendQueue() {
@ -188,9 +196,13 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
while (!shutting_down_) {
auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
if (sensor_data) {
if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) {
batch_request.clear_sensor_data();
TryRecovery();
continue;
}
proto::SensorData* added_sensor_data = batch_request.add_sensor_data();
*added_sensor_data = *sensor_data;
TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata());
// A submap also holds a trajectory id that must be translated to uplink's
// trajectory id.
@ -205,7 +217,7 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
if (batch_request.sensor_data_size() == batch_size_) {
async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
client_channel_, common::FromSeconds(10),
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds),
async_grpc::CreateUnlimitedConstantDelayStrategy(
common::FromSeconds(1), kUnrecoverableStatusCodes));
if (client.Write(batch_request)) {
@ -222,15 +234,19 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
}
}
void LocalTrajectoryUploader::TranslateTrajectoryId(
bool LocalTrajectoryUploader::TranslateTrajectoryId(
proto::SensorMetadata* sensor_metadata) {
int cloud_trajectory_id = local_trajectory_id_to_trajectory_info_
.at(sensor_metadata->trajectory_id())
.uplink_trajectory_id;
auto it = local_trajectory_id_to_trajectory_info_.find(
sensor_metadata->trajectory_id());
if (it == local_trajectory_id_to_trajectory_info_.end()) {
return false;
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
sensor_metadata->set_trajectory_id(cloud_trajectory_id);
return true;
}
bool LocalTrajectoryUploader::AddTrajectory(
grpc::Status LocalTrajectoryUploader::AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
@ -245,11 +261,12 @@ bool LocalTrajectoryUploader::AddTrajectory(
}
*request.add_expected_sensor_ids() =
cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_);
async_grpc::Client<handlers::AddTrajectorySignature> client(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
::grpc::Status status;
if (!client.Write(request, &status)) {
LOG(ERROR) << status.error_message();
return false;
return status;
}
LOG(INFO) << "Created trajectory for client_id: " << client_id
<< " local trajectory_id: " << local_trajectory_id
@ -261,22 +278,26 @@ bool LocalTrajectoryUploader::AddTrajectory(
std::forward_as_tuple(client.response().trajectory_id(),
expected_sensor_ids, trajectory_options,
client_id));
return true;
return status;
}
void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id,
int local_trajectory_id) {
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
1);
int cloud_trajectory_id =
local_trajectory_id_to_trajectory_info_.at(local_trajectory_id)
.uplink_trajectory_id;
grpc::Status LocalTrajectoryUploader::FinishTrajectory(
const std::string& client_id, int local_trajectory_id) {
auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id);
if (it == local_trajectory_id_to_trajectory_info_.end()) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"local_trajectory_id has not been"
" registered with AddTrajectory.");
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
proto::FinishTrajectoryRequest request;
request.set_client_id(client_id);
request.set_trajectory_id(cloud_trajectory_id);
async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_);
CHECK(client.Write(request));
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
grpc::Status status;
client.Write(request, &status);
return status;
}
void LocalTrajectoryUploader::EnqueueSensorData(

View File

@ -24,10 +24,13 @@
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/mapping/proto/trajectory_builder_options.pb.h"
#include "cartographer/mapping/trajectory_builder_interface.h"
#include "grpc++/support/status.h"
namespace cartographer {
namespace cloud {
// Uploads sensor data batches to uplink server.
// Gracefully handles interruptions of the connection.
class LocalTrajectoryUploaderInterface {
public:
using SensorId = mapping::TrajectoryBuilderInterface::SensorId;
@ -46,13 +49,13 @@ class LocalTrajectoryUploaderInterface {
std::unique_ptr<proto::SensorData> sensor_data) = 0;
// Creates a new trajectory with the specified settings in the uplink. A
// return value of 'false' indicates that the creation failed.
virtual bool AddTrajectory(
// return 'value' with '!value.ok()' indicates that the creation failed.
virtual grpc::Status AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0;
virtual void FinishTrajectory(const std::string& client_id,
virtual grpc::Status FinishTrajectory(const std::string& client_id,
int local_trajectory_id) = 0;
virtual SensorId GetLocalSlamResultSensorId(
int local_trajectory_id) const = 0;

View File

@ -0,0 +1,57 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/local_trajectory_uploader.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
using SensorId = ::cartographer::mapping::TrajectoryBuilderInterface::SensorId;
namespace cartographer {
namespace cloud {
namespace {
constexpr char kClientId[] = "CLIENT_ID";
const SensorId kImuSensorId{SensorId::SensorType::IMU, "imu"};
const SensorId kRangeSensorId{SensorId::SensorType::RANGE, "range"};
const int kLocalTrajectoryId = 3;
TEST(LocalTrajectoryUploaderTest, HandlesInvalidUplink) {
auto uploader = CreateLocalTrajectoryUploader("invalid-uplink-address:50051",
/*batch_size=*/1, false, "");
uploader->Start();
mapping::proto::TrajectoryBuilderOptions options;
auto status = uploader->AddTrajectory(
kClientId, kLocalTrajectoryId, {kRangeSensorId, kImuSensorId}, options);
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
auto sensor_data = absl::make_unique<proto::SensorData>();
sensor_data->mutable_sensor_metadata()->set_client_id(kClientId);
sensor_data->mutable_sensor_metadata()->set_sensor_id(kImuSensorId.id);
sensor_data->mutable_sensor_metadata()->set_trajectory_id(kLocalTrajectoryId);
sensor_data->mutable_imu_data()->set_timestamp(1);
uploader->EnqueueSensorData(std::move(sensor_data));
auto sensor_id = uploader->GetLocalSlamResultSensorId(kLocalTrajectoryId);
EXPECT_THAT(sensor_id.id, ::testing::Not(::testing::IsEmpty()));
status = uploader->FinishTrajectory(kClientId, kLocalTrajectoryId);
EXPECT_FALSE(status.ok());
uploader->Shutdown();
}
} // namespace
} // namespace cloud
} // namespace cartographer

View File

@ -36,9 +36,10 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
MOCK_METHOD0(Start, void());
MOCK_METHOD0(Shutdown, void());
MOCK_METHOD4(AddTrajectory,
bool(const std::string &, int, const std::set<SensorId> &,
grpc::Status(const std::string &, int,
const std::set<SensorId> &,
const mapping::proto::TrajectoryBuilderOptions &));
MOCK_METHOD2(FinishTrajectory, void(const std::string &, int));
MOCK_METHOD2(FinishTrajectory, grpc::Status(const std::string &, int));
MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int));
};