diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc index b0b9e04..d7c2e8a 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc @@ -60,14 +60,14 @@ void AddTrajectoryHandler::OnRequest( // Ignore initial poses in trajectory_builder_options. trajectory_builder_options.clear_initial_trajectory_pose(); - if (!GetContext() - ->local_trajectory_uploader() - ->AddTrajectory(request.client_id(), trajectory_id, - expected_sensor_ids, trajectory_builder_options)) { - LOG(ERROR) << "Failed to create trajectory in uplink: " << trajectory_id; - Finish(::grpc::Status(::grpc::INTERNAL, - "Failed to create trajectory in uplink")); - return; + auto status = + GetContext() + ->local_trajectory_uploader() + ->AddTrajectory(request.client_id(), trajectory_id, + expected_sensor_ids, trajectory_builder_options); + if (!status.ok()) { + LOG(ERROR) << "Failed to create trajectory_id " << trajectory_id + << " in uplink. Error: " << status.error_message(); } } diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc index 4612e49..d05ec5d 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc @@ -136,7 +136,7 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) { AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request), Truly(testing::BuildProtoPredicateEquals( &upstream_trajectory_builder_options)))) - .WillOnce(Return(13)); + .WillOnce(Return(grpc::Status::OK)); test_server_->SendWrite(request); EXPECT_EQ(test_server_->response().trajectory_id(), 13); } diff --git a/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc b/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc index d6b881d..3eaec9b 100644 --- a/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/finish_trajectory_handler.cc @@ -41,9 +41,14 @@ void FinishTrajectoryHandler::OnRequest( ->NotifyFinishTrajectory(request.trajectory_id()); if (GetUnsynchronizedContext() ->local_trajectory_uploader()) { - GetContext() - ->local_trajectory_uploader() - ->FinishTrajectory(request.client_id(), request.trajectory_id()); + auto status = + GetContext() + ->local_trajectory_uploader() + ->FinishTrajectory(request.client_id(), request.trajectory_id()); + if (!status.ok()) { + LOG(ERROR) << "Failed to finish trajectory in uplink: " + << status.error_message(); + } } Send(absl::make_unique()); } diff --git a/cartographer/cloud/internal/local_trajectory_uploader.cc b/cartographer/cloud/internal/local_trajectory_uploader.cc index bfffa9a..7cc1bf6 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.cc +++ b/cartographer/cloud/internal/local_trajectory_uploader.cc @@ -82,12 +82,12 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { // complete. void Shutdown() final; - bool AddTrajectory( + grpc::Status AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final; - void FinishTrajectory(const std::string& client_id, - int local_trajectory_id) final; + grpc::Status FinishTrajectory(const std::string& client_id, + int local_trajectory_id) final; void EnqueueSensorData(std::unique_ptr sensor_data) final; void TryRecovery(); @@ -98,7 +98,8 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { private: void ProcessSendQueue(); - void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); + // Returns 'false' for failure. + bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); std::shared_ptr<::grpc::Channel> client_channel_; int batch_size_; @@ -124,12 +125,12 @@ LocalTrajectoryUploader::LocalTrajectoryUploader( grpc::CompositeChannelCredentials(channel_creds, call_creds); } client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds); - std::chrono::system_clock::time_point deadline( + std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + - std::chrono::seconds(kConnectionTimeoutInSeconds)); + std::chrono::seconds(kConnectionTimeoutInSeconds); LOG(INFO) << "Connecting to uplink " << uplink_server_address; if (!client_channel_->WaitForConnected(deadline)) { - LOG(FATAL) << "Failed to connect to " << uplink_server_address; + LOG(ERROR) << "Failed to connect to " << uplink_server_address; } } @@ -151,7 +152,11 @@ void LocalTrajectoryUploader::Shutdown() { void LocalTrajectoryUploader::TryRecovery() { // Wind the sensor_data_queue forward to the next new submap. + LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap."; while (true) { + if (shutting_down_) { + return; + } proto::SensorData* sensor_data = send_queue_.PeekWithTimeout(kPopTimeout); if (sensor_data) { @@ -173,13 +178,16 @@ void LocalTrajectoryUploader::TryRecovery() { local_trajectory_id_to_trajectory_info_; local_trajectory_id_to_trajectory_info_.clear(); for (const auto& entry : local_trajectory_id_to_trajectory_info) { - if (!AddTrajectory(entry.second.client_id, entry.first, - entry.second.expected_sensor_ids, - entry.second.trajectory_options)) { - LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt."; + grpc::Status status = AddTrajectory(entry.second.client_id, entry.first, + entry.second.expected_sensor_ids, + entry.second.trajectory_options); + if (!status.ok()) { + LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. " + << status.error_message(); return; } } + LOG(INFO) << "LocalTrajectoryUploader recovered."; } void LocalTrajectoryUploader::ProcessSendQueue() { @@ -188,9 +196,13 @@ void LocalTrajectoryUploader::ProcessSendQueue() { while (!shutting_down_) { auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout); if (sensor_data) { + if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) { + batch_request.clear_sensor_data(); + TryRecovery(); + continue; + } proto::SensorData* added_sensor_data = batch_request.add_sensor_data(); *added_sensor_data = *sensor_data; - TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata()); // A submap also holds a trajectory id that must be translated to uplink's // trajectory id. @@ -205,7 +217,7 @@ void LocalTrajectoryUploader::ProcessSendQueue() { if (batch_request.sensor_data_size() == batch_size_) { async_grpc::Client client( - client_channel_, common::FromSeconds(10), + client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds), async_grpc::CreateUnlimitedConstantDelayStrategy( common::FromSeconds(1), kUnrecoverableStatusCodes)); if (client.Write(batch_request)) { @@ -222,15 +234,19 @@ void LocalTrajectoryUploader::ProcessSendQueue() { } } -void LocalTrajectoryUploader::TranslateTrajectoryId( +bool LocalTrajectoryUploader::TranslateTrajectoryId( proto::SensorMetadata* sensor_metadata) { - int cloud_trajectory_id = local_trajectory_id_to_trajectory_info_ - .at(sensor_metadata->trajectory_id()) - .uplink_trajectory_id; + auto it = local_trajectory_id_to_trajectory_info_.find( + sensor_metadata->trajectory_id()); + if (it == local_trajectory_id_to_trajectory_info_.end()) { + return false; + } + int cloud_trajectory_id = it->second.uplink_trajectory_id; sensor_metadata->set_trajectory_id(cloud_trajectory_id); + return true; } -bool LocalTrajectoryUploader::AddTrajectory( +grpc::Status LocalTrajectoryUploader::AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { @@ -245,11 +261,12 @@ bool LocalTrajectoryUploader::AddTrajectory( } *request.add_expected_sensor_ids() = cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); - async_grpc::Client client(client_channel_); + async_grpc::Client client( + client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); ::grpc::Status status; if (!client.Write(request, &status)) { LOG(ERROR) << status.error_message(); - return false; + return status; } LOG(INFO) << "Created trajectory for client_id: " << client_id << " local trajectory_id: " << local_trajectory_id @@ -261,22 +278,26 @@ bool LocalTrajectoryUploader::AddTrajectory( std::forward_as_tuple(client.response().trajectory_id(), expected_sensor_ids, trajectory_options, client_id)); - return true; + return status; } -void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id, - int local_trajectory_id) { - CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), - 1); - int cloud_trajectory_id = - local_trajectory_id_to_trajectory_info_.at(local_trajectory_id) - .uplink_trajectory_id; +grpc::Status LocalTrajectoryUploader::FinishTrajectory( + const std::string& client_id, int local_trajectory_id) { + auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id); + if (it == local_trajectory_id_to_trajectory_info_.end()) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "local_trajectory_id has not been" + " registered with AddTrajectory."); + } + int cloud_trajectory_id = it->second.uplink_trajectory_id; proto::FinishTrajectoryRequest request; request.set_client_id(client_id); request.set_trajectory_id(cloud_trajectory_id); async_grpc::Client client( - client_channel_); - CHECK(client.Write(request)); + client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); + grpc::Status status; + client.Write(request, &status); + return status; } void LocalTrajectoryUploader::EnqueueSensorData( diff --git a/cartographer/cloud/internal/local_trajectory_uploader.h b/cartographer/cloud/internal/local_trajectory_uploader.h index f0cb6e2..906a453 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.h +++ b/cartographer/cloud/internal/local_trajectory_uploader.h @@ -24,10 +24,13 @@ #include "cartographer/cloud/proto/map_builder_service.pb.h" #include "cartographer/mapping/proto/trajectory_builder_options.pb.h" #include "cartographer/mapping/trajectory_builder_interface.h" +#include "grpc++/support/status.h" namespace cartographer { namespace cloud { +// Uploads sensor data batches to uplink server. +// Gracefully handles interruptions of the connection. class LocalTrajectoryUploaderInterface { public: using SensorId = mapping::TrajectoryBuilderInterface::SensorId; @@ -46,14 +49,14 @@ class LocalTrajectoryUploaderInterface { std::unique_ptr sensor_data) = 0; // Creates a new trajectory with the specified settings in the uplink. A - // return value of 'false' indicates that the creation failed. - virtual bool AddTrajectory( + // return 'value' with '!value.ok()' indicates that the creation failed. + virtual grpc::Status AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0; - virtual void FinishTrajectory(const std::string& client_id, - int local_trajectory_id) = 0; + virtual grpc::Status FinishTrajectory(const std::string& client_id, + int local_trajectory_id) = 0; virtual SensorId GetLocalSlamResultSensorId( int local_trajectory_id) const = 0; }; diff --git a/cartographer/cloud/internal/local_trajectory_uploader_test.cc b/cartographer/cloud/internal/local_trajectory_uploader_test.cc new file mode 100644 index 0000000..571456d --- /dev/null +++ b/cartographer/cloud/internal/local_trajectory_uploader_test.cc @@ -0,0 +1,57 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cartographer/cloud/internal/local_trajectory_uploader.h" + +#include "glog/logging.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using SensorId = ::cartographer::mapping::TrajectoryBuilderInterface::SensorId; + +namespace cartographer { +namespace cloud { +namespace { + +constexpr char kClientId[] = "CLIENT_ID"; +const SensorId kImuSensorId{SensorId::SensorType::IMU, "imu"}; +const SensorId kRangeSensorId{SensorId::SensorType::RANGE, "range"}; +const int kLocalTrajectoryId = 3; + +TEST(LocalTrajectoryUploaderTest, HandlesInvalidUplink) { + auto uploader = CreateLocalTrajectoryUploader("invalid-uplink-address:50051", + /*batch_size=*/1, false, ""); + uploader->Start(); + mapping::proto::TrajectoryBuilderOptions options; + auto status = uploader->AddTrajectory( + kClientId, kLocalTrajectoryId, {kRangeSensorId, kImuSensorId}, options); + EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED); + auto sensor_data = absl::make_unique(); + sensor_data->mutable_sensor_metadata()->set_client_id(kClientId); + sensor_data->mutable_sensor_metadata()->set_sensor_id(kImuSensorId.id); + sensor_data->mutable_sensor_metadata()->set_trajectory_id(kLocalTrajectoryId); + sensor_data->mutable_imu_data()->set_timestamp(1); + uploader->EnqueueSensorData(std::move(sensor_data)); + auto sensor_id = uploader->GetLocalSlamResultSensorId(kLocalTrajectoryId); + EXPECT_THAT(sensor_id.id, ::testing::Not(::testing::IsEmpty())); + status = uploader->FinishTrajectory(kClientId, kLocalTrajectoryId); + EXPECT_FALSE(status.ok()); + uploader->Shutdown(); +} + +} // namespace +} // namespace cloud +} // namespace cartographer diff --git a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h index af8bc98..ba24cf0 100644 --- a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h +++ b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h @@ -36,9 +36,10 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { MOCK_METHOD0(Start, void()); MOCK_METHOD0(Shutdown, void()); MOCK_METHOD4(AddTrajectory, - bool(const std::string &, int, const std::set &, - const mapping::proto::TrajectoryBuilderOptions &)); - MOCK_METHOD2(FinishTrajectory, void(const std::string &, int)); + grpc::Status(const std::string &, int, + const std::set &, + const mapping::proto::TrajectoryBuilderOptions &)); + MOCK_METHOD2(FinishTrajectory, grpc::Status(const std::string &, int)); MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int)); };