Implement RetryStrategies and use for AddTrajectory (#880)

master
Christoph Schütte 2018-02-01 22:31:33 +01:00 committed by Wally B. Feed
parent 9eaf960936
commit 92fa1782f3
4 changed files with 207 additions and 34 deletions

View File

@ -17,6 +17,7 @@
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H #define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H
#include "cartographer_grpc/framework/retry.h"
#include "cartographer_grpc/framework/rpc_handler_interface.h" #include "cartographer_grpc/framework/rpc_handler_interface.h"
#include "cartographer_grpc/framework/type_traits.h" #include "cartographer_grpc/framework/type_traits.h"
#include "grpc++/grpc++.h" #include "grpc++/grpc++.h"
@ -29,8 +30,26 @@ namespace framework {
template <typename RpcHandlerType> template <typename RpcHandlerType>
class Client { class Client {
public: public:
Client(std::shared_ptr<grpc::Channel> channel, RetryStrategy retry_strategy)
: channel_(channel),
client_context_(
cartographer::common::make_unique<grpc::ClientContext>()),
rpc_method_name_(
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name()),
rpc_method_(rpc_method_name_.c_str(),
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value,
channel_),
retry_strategy_(retry_strategy) {
CHECK(!retry_strategy ||
rpc_method_.method_type() == ::grpc::internal::RpcMethod::NORMAL_RPC)
<< "Retry is currently only support for NORMAL_RPC.";
}
Client(std::shared_ptr<grpc::Channel> channel) Client(std::shared_ptr<grpc::Channel> channel)
: channel_(channel), : channel_(channel),
client_context_(
cartographer::common::make_unique<grpc::ClientContext>()),
rpc_method_name_( rpc_method_name_(
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name()), RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name()),
rpc_method_(rpc_method_name_.c_str(), rpc_method_(rpc_method_name_.c_str(),
@ -38,7 +57,7 @@ class Client {
typename RpcHandlerType::OutgoingType>::value, typename RpcHandlerType::OutgoingType>::value,
channel_) {} channel_) {}
bool Read(typename RpcHandlerType::ResponseType* response) { bool Read(typename RpcHandlerType::ResponseType *response) {
switch (rpc_method_.method_type()) { switch (rpc_method_.method_type()) {
case grpc::internal::RpcMethod::BIDI_STREAMING: case grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded(); InstantiateClientReaderWriterIfNeeded();
@ -51,21 +70,11 @@ class Client {
} }
} }
bool Write(const typename RpcHandlerType::RequestType& request) { bool Write(const typename RpcHandlerType::RequestType &request) {
switch (rpc_method_.method_type()) { return RetryWithStrategy(
case grpc::internal::RpcMethod::NORMAL_RPC: retry_strategy_,
return MakeBlockingUnaryCall(request, &response_).ok(); std::bind(&Client<RpcHandlerType>::WriteImpl, this, request),
case grpc::internal::RpcMethod::CLIENT_STREAMING: std::bind(&Client<RpcHandlerType>::Reset, this));
InstantiateClientWriterIfNeeded();
return client_writer_->Write(request);
case grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Write(request);
case grpc::internal::RpcMethod::SERVER_STREAMING:
InstantiateClientReader(request);
return true;
}
LOG(FATAL) << "Not reached.";
} }
bool WritesDone() { bool WritesDone() {
@ -97,7 +106,7 @@ class Client {
} }
} }
const typename RpcHandlerType::ResponseType& response() { const typename RpcHandlerType::ResponseType &response() {
CHECK(rpc_method_.method_type() == grpc::internal::RpcMethod::NORMAL_RPC || CHECK(rpc_method_.method_type() == grpc::internal::RpcMethod::NORMAL_RPC ||
rpc_method_.method_type() == rpc_method_.method_type() ==
grpc::internal::RpcMethod::CLIENT_STREAMING); grpc::internal::RpcMethod::CLIENT_STREAMING);
@ -105,15 +114,35 @@ class Client {
} }
private: private:
void Reset() {
client_context_ = cartographer::common::make_unique<grpc::ClientContext>();
}
bool WriteImpl(const typename RpcHandlerType::RequestType &request) {
switch (rpc_method_.method_type()) {
case grpc::internal::RpcMethod::NORMAL_RPC:
return MakeBlockingUnaryCall(request, &response_).ok();
case grpc::internal::RpcMethod::CLIENT_STREAMING:
InstantiateClientWriterIfNeeded();
return client_writer_->Write(request);
case grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Write(request);
case grpc::internal::RpcMethod::SERVER_STREAMING:
InstantiateClientReader(request);
return true;
}
LOG(FATAL) << "Not reached.";
}
void InstantiateClientWriterIfNeeded() { void InstantiateClientWriterIfNeeded() {
CHECK_EQ(rpc_method_.method_type(), CHECK_EQ(rpc_method_.method_type(),
grpc::internal::RpcMethod::CLIENT_STREAMING); grpc::internal::RpcMethod::CLIENT_STREAMING);
if (!client_writer_) { if (!client_writer_) {
client_writer_.reset( client_writer_.reset(
grpc::internal::ClientWriterFactory< grpc::internal::
typename RpcHandlerType::RequestType>::Create(channel_.get(), ClientWriterFactory<typename RpcHandlerType::RequestType>::Create(
rpc_method_, channel_.get(), rpc_method_, client_context_.get(),
&client_context_,
&response_)); &response_));
} }
} }
@ -127,32 +156,31 @@ class Client {
typename RpcHandlerType::RequestType, typename RpcHandlerType::RequestType,
typename RpcHandlerType::ResponseType>::Create(channel_.get(), typename RpcHandlerType::ResponseType>::Create(channel_.get(),
rpc_method_, rpc_method_,
&client_context_)); client_context_
.get()));
} }
} }
void InstantiateClientReader( void InstantiateClientReader(
const typename RpcHandlerType::RequestType& request) { const typename RpcHandlerType::RequestType &request) {
CHECK_EQ(rpc_method_.method_type(), CHECK_EQ(rpc_method_.method_type(),
grpc::internal::RpcMethod::SERVER_STREAMING); grpc::internal::RpcMethod::SERVER_STREAMING);
client_reader_.reset( client_reader_.reset(
grpc::internal::ClientReaderFactory< grpc::internal::
typename RpcHandlerType::ResponseType>::Create(channel_.get(), ClientReaderFactory<typename RpcHandlerType::ResponseType>::Create(
rpc_method_, channel_.get(), rpc_method_, client_context_.get(), request));
&client_context_,
request));
} }
grpc::Status MakeBlockingUnaryCall( grpc::Status MakeBlockingUnaryCall(
const typename RpcHandlerType::RequestType& request, const typename RpcHandlerType::RequestType &request,
typename RpcHandlerType::ResponseType* response) { typename RpcHandlerType::ResponseType *response) {
CHECK_EQ(rpc_method_.method_type(), grpc::internal::RpcMethod::NORMAL_RPC); CHECK_EQ(rpc_method_.method_type(), grpc::internal::RpcMethod::NORMAL_RPC);
return ::grpc::internal::BlockingUnaryCall( return ::grpc::internal::BlockingUnaryCall(
channel_.get(), rpc_method_, &client_context_, request, response); channel_.get(), rpc_method_, client_context_.get(), request, response);
} }
std::shared_ptr<grpc::Channel> channel_; std::shared_ptr<grpc::Channel> channel_;
grpc::ClientContext client_context_; std::unique_ptr<grpc::ClientContext> client_context_;
const std::string rpc_method_name_; const std::string rpc_method_name_;
const ::grpc::internal::RpcMethod rpc_method_; const ::grpc::internal::RpcMethod rpc_method_;
@ -165,6 +193,7 @@ class Client {
std::unique_ptr<grpc::ClientReader<typename RpcHandlerType::ResponseType>> std::unique_ptr<grpc::ClientReader<typename RpcHandlerType::ResponseType>>
client_reader_; client_reader_;
typename RpcHandlerType::ResponseType response_; typename RpcHandlerType::ResponseType response_;
RetryStrategy retry_strategy_;
}; };
} // namespace framework } // namespace framework

View File

@ -0,0 +1,90 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <thread>
#include "cartographer_grpc/framework/retry.h"
#include "glog/logging.h"
namespace cartographer_grpc {
namespace framework {
RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator,
RetryDelayCalculator retry_delay_calculator) {
return [retry_indicator, retry_delay_calculator](int failed_attempts) {
if (!retry_indicator(failed_attempts)) {
return optional<Duration>();
}
return optional<Duration>(retry_delay_calculator(failed_attempts));
};
}
RetryIndicator CreateLimitedRetryIndicator(int max_attempts) {
return [max_attempts](int failed_attempts) {
return failed_attempts < max_attempts;
};
}
RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay,
float backoff_factor) {
return [min_delay, backoff_factor](int failed_attempts) -> Duration {
CHECK_GE(failed_attempts, 0);
using cartographer::common::FromSeconds;
using cartographer::common::ToSeconds;
return FromSeconds(std::pow(backoff_factor, failed_attempts - 1) *
ToSeconds(min_delay));
};
}
RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay,
float backoff_factor,
int max_attempts) {
return CreateRetryStrategy(
CreateLimitedRetryIndicator(max_attempts),
CreateBackoffDelayCalculator(min_delay, backoff_factor));
}
bool RetryWithStrategy(RetryStrategy retry_strategy, std::function<bool()> op,
std::function<void()> reset) {
optional<Duration> delay;
int failed_attemps = 0;
for (;;) {
if (op()) {
return true;
}
if (!retry_strategy) {
return false;
}
delay = retry_strategy(++failed_attemps);
if (!delay.has_value()) {
break;
}
LOG(INFO) << "Retrying after "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
delay.value())
.count()
<< " milliseconds.";
std::this_thread::sleep_for(delay.value());
if (reset) {
reset();
}
}
return false;
}
} // namespace framework
} // namespace cartographer_grpc

View File

@ -0,0 +1,51 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H
#include "cartographer/common/optional.h"
#include "cartographer/common/time.h"
#include "grpc++/grpc++.h"
namespace cartographer_grpc {
namespace framework {
using cartographer::common::Duration;
using cartographer::common::optional;
using RetryStrategy =
std::function<optional<Duration>(int /* failed_attempts */)>;
using RetryIndicator = std::function<bool(int /* failed_attempts */)>;
using RetryDelayCalculator = std::function<Duration(int /* failed_attempts */)>;
RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator,
RetryDelayCalculator retry_delay_calculator);
RetryIndicator CreateLimitedRetryIndicator(int max_attempts);
RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay,
float backoff_factor);
RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay,
float backoff_factor,
int max_attempts);
bool RetryWithStrategy(RetryStrategy retry_strategy, std::function<bool()> op,
std::function<void()> reset = nullptr);
} // namespace framework
} // namespace cartographer_grpc
#endif // CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H

View File

@ -42,7 +42,10 @@ int MapBuilderStub::AddTrajectoryBuilder(
for (const auto& sensor_id : expected_sensor_ids) { for (const auto& sensor_id : expected_sensor_ids) {
*request.add_expected_sensor_ids() = sensor::ToProto(sensor_id); *request.add_expected_sensor_ids() = sensor::ToProto(sensor_id);
} }
framework::Client<handlers::AddTrajectoryHandler> client(client_channel_); framework::Client<handlers::AddTrajectoryHandler> client(
client_channel_,
framework::CreateLimitedBackoffStrategy(
cartographer::common::FromMilliseconds(100), 2.f, 5));
CHECK(client.Write(request)); CHECK(client.Write(request));
// Construct trajectory builder stub. // Construct trajectory builder stub.