Add token_file_path parameter and enable auth (#1235)

master
Christoph Schütte 2018-07-05 13:14:11 +02:00 committed by GitHub
parent cc9fc75757
commit bad8c96bc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 49 additions and 32 deletions

View File

@ -228,9 +228,9 @@ def cartographer_repositories():
_maybe(native.http_archive, _maybe(native.http_archive,
name = "com_github_googlecartographer_async_grpc", name = "com_github_googlecartographer_async_grpc",
strip_prefix = "async_grpc-c2c68f56904a595ab5ba24c1fb19b4b8e954fa15", strip_prefix = "async_grpc-a562634ecea49beac5a08436ca76c6b82efb439b",
urls = [ urls = [
"https://github.com/googlecartographer/async_grpc/archive/c2c68f56904a595ab5ba24c1fb19b4b8e954fa15.tar.gz", "https://github.com/googlecartographer/async_grpc/archive/a562634ecea49beac5a08436ca76c6b82efb439b.tar.gz",
], ],
) )

View File

@ -35,7 +35,7 @@ namespace cloud {
namespace { namespace {
using common::make_unique; using common::make_unique;
constexpr int kConnectionTimeoutInSecond = 10; constexpr int kConnectionTimeoutInSeconds = 10;
} // namespace } // namespace
@ -46,7 +46,7 @@ MapBuilderStub::MapBuilderStub(const std::string& server_address)
LOG(INFO) << "Connecting to SLAM process at " << server_address; LOG(INFO) << "Connecting to SLAM process at " << server_address;
std::chrono::system_clock::time_point deadline( std::chrono::system_clock::time_point deadline(
std::chrono::system_clock::now() + std::chrono::system_clock::now() +
std::chrono::seconds(kConnectionTimeoutInSecond)); std::chrono::seconds(kConnectionTimeoutInSeconds));
if (!client_channel_->WaitForConnected(deadline)) { if (!client_channel_->WaitForConnected(deadline)) {
LOG(FATAL) << "Failed to connect to " << server_address; LOG(FATAL) << "Failed to connect to " << server_address;
} }

View File

@ -20,6 +20,7 @@
#include <thread> #include <thread>
#include "async_grpc/client.h" #include "async_grpc/client.h"
#include "async_grpc/token_file_credentials.h"
#include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h" #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h"
#include "cartographer/cloud/internal/handlers/add_trajectory_handler.h" #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
#include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h" #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
@ -35,13 +36,15 @@ namespace {
using common::make_unique; using common::make_unique;
constexpr int kConnectionTimeoutInSecond = 10; constexpr int kConnectionTimeoutInSeconds = 10;
constexpr int kTokenRefreshIntervalInSeconds = 60;
const common::Duration kPopTimeout = common::FromMilliseconds(100); const common::Duration kPopTimeout = common::FromMilliseconds(100);
class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
public: public:
LocalTrajectoryUploader(const std::string &uplink_server_address, LocalTrajectoryUploader(const std::string& uplink_server_address,
int batch_size, bool enable_ssl_encryption); int batch_size, bool enable_ssl_encryption,
const std::string& token_file_path);
~LocalTrajectoryUploader(); ~LocalTrajectoryUploader();
// Starts the upload thread. // Starts the upload thread.
@ -52,8 +55,8 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
void Shutdown() final; void Shutdown() final;
void AddTrajectory( void AddTrajectory(
int local_trajectory_id, const std::set<SensorId> &expected_sensor_ids, int local_trajectory_id, const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions &trajectory_options) final; const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
void FinishTrajectory(int local_trajectory_id) final; void FinishTrajectory(int local_trajectory_id) final;
void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final; void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
@ -64,7 +67,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
private: private:
void ProcessSendQueue(); void ProcessSendQueue();
void TranslateTrajectoryId(proto::SensorMetadata *sensor_metadata); void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
std::shared_ptr<::grpc::Channel> client_channel_; std::shared_ptr<::grpc::Channel> client_channel_;
int batch_size_; int batch_size_;
@ -75,17 +78,24 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
}; };
LocalTrajectoryUploader::LocalTrajectoryUploader( LocalTrajectoryUploader::LocalTrajectoryUploader(
const std::string &uplink_server_address, int batch_size, const std::string& uplink_server_address, int batch_size,
bool enable_ssl_encryption) bool enable_ssl_encryption, const std::string& token_file_path)
: client_channel_(::grpc::CreateChannel( : batch_size_(batch_size) {
uplink_server_address, auto channel_creds =
enable_ssl_encryption enable_ssl_encryption
? ::grpc::SslCredentials(::grpc::SslCredentialsOptions()) ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions())
: ::grpc::InsecureChannelCredentials())), : ::grpc::InsecureChannelCredentials();
batch_size_(batch_size) {
if (!token_file_path.empty()) {
auto call_creds = async_grpc::TokenFileCredentials(
token_file_path, std::chrono::seconds(kTokenRefreshIntervalInSeconds));
channel_creds =
grpc::CompositeChannelCredentials(channel_creds, call_creds);
}
client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds);
std::chrono::system_clock::time_point deadline( std::chrono::system_clock::time_point deadline(
std::chrono::system_clock::now() + std::chrono::system_clock::now() +
std::chrono::seconds(kConnectionTimeoutInSecond)); std::chrono::seconds(kConnectionTimeoutInSeconds));
LOG(INFO) << "Connecting to uplink " << uplink_server_address; LOG(INFO) << "Connecting to uplink " << uplink_server_address;
if (!client_channel_->WaitForConnected(deadline)) { if (!client_channel_->WaitForConnected(deadline)) {
LOG(FATAL) << "Failed to connect to " << uplink_server_address; LOG(FATAL) << "Failed to connect to " << uplink_server_address;
@ -114,14 +124,14 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
while (!shutting_down_) { while (!shutting_down_) {
auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout); auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
if (sensor_data) { if (sensor_data) {
proto::SensorData *added_sensor_data = batch_request.add_sensor_data(); proto::SensorData* added_sensor_data = batch_request.add_sensor_data();
*added_sensor_data = *sensor_data; *added_sensor_data = *sensor_data;
TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata()); TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata());
// A submap also holds a trajectory id that must be translated to uplink's // A submap also holds a trajectory id that must be translated to uplink's
// trajectory id. // trajectory id.
if (added_sensor_data->has_local_slam_result_data()) { if (added_sensor_data->has_local_slam_result_data()) {
for (mapping::proto::Submap &mutable_submap : for (mapping::proto::Submap& mutable_submap :
*added_sensor_data->mutable_local_slam_result_data() *added_sensor_data->mutable_local_slam_result_data()
->mutable_submaps()) { ->mutable_submaps()) {
mutable_submap.mutable_submap_id()->set_trajectory_id( mutable_submap.mutable_submap_id()->set_trajectory_id(
@ -141,18 +151,18 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
} }
void LocalTrajectoryUploader::TranslateTrajectoryId( void LocalTrajectoryUploader::TranslateTrajectoryId(
proto::SensorMetadata *sensor_metadata) { proto::SensorMetadata* sensor_metadata) {
int cloud_trajectory_id = int cloud_trajectory_id =
local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id()); local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id());
sensor_metadata->set_trajectory_id(cloud_trajectory_id); sensor_metadata->set_trajectory_id(cloud_trajectory_id);
} }
void LocalTrajectoryUploader::AddTrajectory( void LocalTrajectoryUploader::AddTrajectory(
int local_trajectory_id, const std::set<SensorId> &expected_sensor_ids, int local_trajectory_id, const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions &trajectory_options) { const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
proto::AddTrajectoryRequest request; proto::AddTrajectoryRequest request;
*request.mutable_trajectory_builder_options() = trajectory_options; *request.mutable_trajectory_builder_options() = trajectory_options;
for (const SensorId &sensor_id : expected_sensor_ids) { for (const SensorId& sensor_id : expected_sensor_ids) {
// Range sensors are not forwarded, but combined into a LocalSlamResult. // Range sensors are not forwarded, but combined into a LocalSlamResult.
if (sensor_id.type != SensorId::SensorType::RANGE) { if (sensor_id.type != SensorId::SensorType::RANGE) {
*request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
@ -161,7 +171,8 @@ void LocalTrajectoryUploader::AddTrajectory(
*request.add_expected_sensor_ids() = *request.add_expected_sensor_ids() =
cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_); async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_);
CHECK(client.Write(request)); ::grpc::Status status;
CHECK(client.Write(request, &status)) << status.error_message();
CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0); CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0);
local_to_cloud_trajectory_id_map_[local_trajectory_id] = local_to_cloud_trajectory_id_map_[local_trajectory_id] =
client.response().trajectory_id(); client.response().trajectory_id();
@ -186,10 +197,11 @@ void LocalTrajectoryUploader::EnqueueSensorData(
} // namespace } // namespace
std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader( std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader(
const std::string &uplink_server_address, int batch_size, const std::string& uplink_server_address, int batch_size,
bool enable_ssl_encryption) { bool enable_ssl_encryption, const std::string& token_file_path) {
return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size, return make_unique<LocalTrajectoryUploader>(uplink_server_address, batch_size,
enable_ssl_encryption); enable_ssl_encryption,
token_file_path);
} }
} // namespace cloud } // namespace cloud

View File

@ -56,7 +56,7 @@ class LocalTrajectoryUploaderInterface {
// Returns LocalTrajectoryUploader with the actual implementation. // Returns LocalTrajectoryUploader with the actual implementation.
std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader( std::unique_ptr<LocalTrajectoryUploaderInterface> CreateLocalTrajectoryUploader(
const std::string& uplink_server_address, int batch_size, const std::string& uplink_server_address, int batch_size,
bool enable_ssl_encryption); bool enable_ssl_encryption, const std::string& token_file_path);
} // namespace cloud } // namespace cloud
} // namespace cartographer } // namespace cartographer

View File

@ -67,7 +67,8 @@ MapBuilderServer::MapBuilderServer(
local_trajectory_uploader_ = CreateLocalTrajectoryUploader( local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
map_builder_server_options.uplink_server_address(), map_builder_server_options.uplink_server_address(),
map_builder_server_options.upload_batch_size(), map_builder_server_options.upload_batch_size(),
map_builder_server_options.enable_ssl_encryption()); map_builder_server_options.enable_ssl_encryption(),
map_builder_server_options.token_file_path());
} }
server_builder.RegisterHandler<handlers::AddTrajectoryHandler>(); server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
server_builder.RegisterHandler<handlers::AddOdometryDataHandler>(); server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();

View File

@ -41,6 +41,8 @@ proto::MapBuilderServerOptions CreateMapBuilderServerOptions(
lua_parameter_dictionary->GetInt("upload_batch_size")); lua_parameter_dictionary->GetInt("upload_batch_size"));
map_builder_server_options.set_enable_ssl_encryption( map_builder_server_options.set_enable_ssl_encryption(
lua_parameter_dictionary->GetBool("enable_ssl_encryption")); lua_parameter_dictionary->GetBool("enable_ssl_encryption"));
map_builder_server_options.set_token_file_path(
lua_parameter_dictionary->GetString("token_file_path"));
return map_builder_server_options; return map_builder_server_options;
} }

View File

@ -26,4 +26,5 @@ message MapBuilderServerOptions {
string uplink_server_address = 5; string uplink_server_address = 5;
int32 upload_batch_size = 6; int32 upload_batch_size = 6;
bool enable_ssl_encryption = 7; bool enable_ssl_encryption = 7;
string token_file_path = 8;
} }

View File

@ -22,4 +22,5 @@ MAP_BUILDER_SERVER = {
uplink_server_address = "", uplink_server_address = "",
upload_batch_size = 100, upload_batch_size = 100,
enable_ssl_encryption = false, enable_ssl_encryption = false,
token_file_path = "",
} }

View File

@ -19,7 +19,7 @@ set -o verbose
git clone https://github.com/googlecartographer/async_grpc git clone https://github.com/googlecartographer/async_grpc
cd async_grpc cd async_grpc
git checkout c2c68f56904a595ab5ba24c1fb19b4b8e954fa15 git checkout a562634ecea49beac5a08436ca76c6b82efb439b
mkdir build mkdir build
cd build cd build
cmake -G Ninja \ cmake -G Ninja \