Recover gRPC channel from connection dropouts. (#1406)
- check and recover channel connection in `TryRecovery()` - fixes an infinite loop in the unlimited retry strategy by adding `grpc::UNAVAILABLE`, `DEADLINE_EXCEEDED` to the unrecoverable status codes. - server restart leads to `UNAVAILABLE` - connection loss leads to `DEADLINE_EXCEEDED` - fixes repeated recovery attemptsmaster
parent
153952ddf0
commit
b1dfa30ee3
|
@ -37,13 +37,18 @@ namespace {
|
||||||
using absl::make_unique;
|
using absl::make_unique;
|
||||||
|
|
||||||
constexpr int kConnectionTimeoutInSeconds = 10;
|
constexpr int kConnectionTimeoutInSeconds = 10;
|
||||||
|
constexpr int kConnectionRecoveryTimeoutInSeconds = 60;
|
||||||
constexpr int kTokenRefreshIntervalInSeconds = 60;
|
constexpr int kTokenRefreshIntervalInSeconds = 60;
|
||||||
const common::Duration kPopTimeout = common::FromMilliseconds(100);
|
const common::Duration kPopTimeout = common::FromMilliseconds(100);
|
||||||
|
|
||||||
// This defines the '::grpc::StatusCode's that are considered unrecoverable
|
// This defines the '::grpc::StatusCode's that are considered unrecoverable
|
||||||
// errors and hence no retries will be attempted by the client.
|
// errors and hence no retries will be attempted by the client.
|
||||||
const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = {
|
const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = {
|
||||||
::grpc::NOT_FOUND};
|
::grpc::DEADLINE_EXCEEDED,
|
||||||
|
::grpc::NOT_FOUND,
|
||||||
|
::grpc::UNAVAILABLE,
|
||||||
|
::grpc::UNKNOWN,
|
||||||
|
};
|
||||||
|
|
||||||
bool IsNewSubmap(const mapping::proto::Submap& submap) {
|
bool IsNewSubmap(const mapping::proto::Submap& submap) {
|
||||||
return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) ||
|
return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) ||
|
||||||
|
@ -151,6 +156,19 @@ void LocalTrajectoryUploader::Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void LocalTrajectoryUploader::TryRecovery() {
|
void LocalTrajectoryUploader::TryRecovery() {
|
||||||
|
if (client_channel_->GetState(false /* try_to_connect */) !=
|
||||||
|
grpc_connectivity_state::GRPC_CHANNEL_READY) {
|
||||||
|
LOG(INFO) << "Trying to re-connect to uplink...";
|
||||||
|
std::chrono::system_clock::time_point deadline =
|
||||||
|
std::chrono::system_clock::now() +
|
||||||
|
std::chrono::seconds(kConnectionRecoveryTimeoutInSeconds);
|
||||||
|
if (!client_channel_->WaitForConnected(deadline)) {
|
||||||
|
LOG(ERROR) << "Failed to re-connect to uplink prior to recovery attempt.";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG(INFO) << "Uplink channel ready, trying recovery.";
|
||||||
|
|
||||||
// Wind the sensor_data_queue forward to the next new submap.
|
// Wind the sensor_data_queue forward to the next new submap.
|
||||||
LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
|
LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -184,6 +202,9 @@ void LocalTrajectoryUploader::TryRecovery() {
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
|
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
|
||||||
<< status.error_message();
|
<< status.error_message();
|
||||||
|
// Restore the previous state for the next recovery attempt.
|
||||||
|
local_trajectory_id_to_trajectory_info_ =
|
||||||
|
local_trajectory_id_to_trajectory_info;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue