diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 4ae2a1d..c74e7d8 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -228,9 +228,9 @@ def cartographer_repositories(): _maybe(native.http_archive, name = "com_github_googlecartographer_async_grpc", - strip_prefix = "async_grpc-c2c68f56904a595ab5ba24c1fb19b4b8e954fa15", + strip_prefix = "async_grpc-a562634ecea49beac5a08436ca76c6b82efb439b", urls = [ - "https://github.com/googlecartographer/async_grpc/archive/c2c68f56904a595ab5ba24c1fb19b4b8e954fa15.tar.gz", + "https://github.com/googlecartographer/async_grpc/archive/a562634ecea49beac5a08436ca76c6b82efb439b.tar.gz", ], ) diff --git a/cartographer/cloud/client/map_builder_stub.cc b/cartographer/cloud/client/map_builder_stub.cc index 8ac4d8e..c30c47e 100644 --- a/cartographer/cloud/client/map_builder_stub.cc +++ b/cartographer/cloud/client/map_builder_stub.cc @@ -35,7 +35,7 @@ namespace cloud { namespace { using common::make_unique; -constexpr int kConnectionTimeoutInSecond = 10; +constexpr int kConnectionTimeoutInSeconds = 10; } // namespace @@ -46,7 +46,7 @@ MapBuilderStub::MapBuilderStub(const std::string& server_address) LOG(INFO) << "Connecting to SLAM process at " << server_address; std::chrono::system_clock::time_point deadline( std::chrono::system_clock::now() + - std::chrono::seconds(kConnectionTimeoutInSecond)); + std::chrono::seconds(kConnectionTimeoutInSeconds)); if (!client_channel_->WaitForConnected(deadline)) { LOG(FATAL) << "Failed to connect to " << server_address; } diff --git a/cartographer/cloud/internal/local_trajectory_uploader.cc b/cartographer/cloud/internal/local_trajectory_uploader.cc index c5add7d..c50588b 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.cc +++ b/cartographer/cloud/internal/local_trajectory_uploader.cc @@ -20,6 +20,7 @@ #include #include "async_grpc/client.h" +#include "async_grpc/token_file_credentials.h" #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h" #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h" #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h" @@ -35,13 +36,15 @@ namespace { using common::make_unique; -constexpr int kConnectionTimeoutInSecond = 10; +constexpr int kConnectionTimeoutInSeconds = 10; +constexpr int kTokenRefreshIntervalInSeconds = 60; const common::Duration kPopTimeout = common::FromMilliseconds(100); class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { public: - LocalTrajectoryUploader(const std::string &uplink_server_address, - int batch_size, bool enable_ssl_encryption); + LocalTrajectoryUploader(const std::string& uplink_server_address, + int batch_size, bool enable_ssl_encryption, + const std::string& token_file_path); ~LocalTrajectoryUploader(); // Starts the upload thread. @@ -52,8 +55,8 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { void Shutdown() final; void AddTrajectory( - int local_trajectory_id, const std::set &expected_sensor_ids, - const mapping::proto::TrajectoryBuilderOptions &trajectory_options) final; + int local_trajectory_id, const std::set& expected_sensor_ids, + const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final; void FinishTrajectory(int local_trajectory_id) final; void EnqueueSensorData(std::unique_ptr sensor_data) final; @@ -64,7 +67,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { private: void ProcessSendQueue(); - void TranslateTrajectoryId(proto::SensorMetadata *sensor_metadata); + void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); std::shared_ptr<::grpc::Channel> client_channel_; int batch_size_; @@ -75,17 +78,24 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { }; LocalTrajectoryUploader::LocalTrajectoryUploader( - const std::string &uplink_server_address, int batch_size, - bool enable_ssl_encryption) - : client_channel_(::grpc::CreateChannel( - uplink_server_address, - enable_ssl_encryption - ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions()) - : ::grpc::InsecureChannelCredentials())), - batch_size_(batch_size) { + const std::string& uplink_server_address, int batch_size, + bool enable_ssl_encryption, const std::string& token_file_path) + : batch_size_(batch_size) { + auto channel_creds = + enable_ssl_encryption + ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions()) + : ::grpc::InsecureChannelCredentials(); + + if (!token_file_path.empty()) { + auto call_creds = async_grpc::TokenFileCredentials( + token_file_path, std::chrono::seconds(kTokenRefreshIntervalInSeconds)); + channel_creds = + grpc::CompositeChannelCredentials(channel_creds, call_creds); + } + client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds); std::chrono::system_clock::time_point deadline( std::chrono::system_clock::now() + - std::chrono::seconds(kConnectionTimeoutInSecond)); + std::chrono::seconds(kConnectionTimeoutInSeconds)); LOG(INFO) << "Connecting to uplink " << uplink_server_address; if (!client_channel_->WaitForConnected(deadline)) { LOG(FATAL) << "Failed to connect to " << uplink_server_address; @@ -114,14 +124,14 @@ void LocalTrajectoryUploader::ProcessSendQueue() { while (!shutting_down_) { auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout); if (sensor_data) { - proto::SensorData *added_sensor_data = batch_request.add_sensor_data(); + proto::SensorData* added_sensor_data = batch_request.add_sensor_data(); *added_sensor_data = *sensor_data; TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata()); // A submap also holds a trajectory id that must be translated to uplink's // trajectory id. if (added_sensor_data->has_local_slam_result_data()) { - for (mapping::proto::Submap &mutable_submap : + for (mapping::proto::Submap& mutable_submap : *added_sensor_data->mutable_local_slam_result_data() ->mutable_submaps()) { mutable_submap.mutable_submap_id()->set_trajectory_id( @@ -141,18 +151,18 @@ void LocalTrajectoryUploader::ProcessSendQueue() { } void LocalTrajectoryUploader::TranslateTrajectoryId( - proto::SensorMetadata *sensor_metadata) { + proto::SensorMetadata* sensor_metadata) { int cloud_trajectory_id = local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id()); sensor_metadata->set_trajectory_id(cloud_trajectory_id); } void LocalTrajectoryUploader::AddTrajectory( - int local_trajectory_id, const std::set &expected_sensor_ids, - const mapping::proto::TrajectoryBuilderOptions &trajectory_options) { + int local_trajectory_id, const std::set& expected_sensor_ids, + const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { proto::AddTrajectoryRequest request; *request.mutable_trajectory_builder_options() = trajectory_options; - for (const SensorId &sensor_id : expected_sensor_ids) { + for (const SensorId& sensor_id : expected_sensor_ids) { // Range sensors are not forwarded, but combined into a LocalSlamResult. if (sensor_id.type != SensorId::SensorType::RANGE) { *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); @@ -161,7 +171,8 @@ void LocalTrajectoryUploader::AddTrajectory( *request.add_expected_sensor_ids() = cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); async_grpc::Client client(client_channel_); - CHECK(client.Write(request)); + ::grpc::Status status; + CHECK(client.Write(request, &status)) << status.error_message(); CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0); local_to_cloud_trajectory_id_map_[local_trajectory_id] = client.response().trajectory_id(); @@ -186,10 +197,11 @@ void LocalTrajectoryUploader::EnqueueSensorData( } // namespace std::unique_ptr CreateLocalTrajectoryUploader( - const std::string &uplink_server_address, int batch_size, - bool enable_ssl_encryption) { + const std::string& uplink_server_address, int batch_size, + bool enable_ssl_encryption, const std::string& token_file_path) { return make_unique(uplink_server_address, batch_size, - enable_ssl_encryption); + enable_ssl_encryption, + token_file_path); } } // namespace cloud diff --git a/cartographer/cloud/internal/local_trajectory_uploader.h b/cartographer/cloud/internal/local_trajectory_uploader.h index 4df3883..f7f4a07 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.h +++ b/cartographer/cloud/internal/local_trajectory_uploader.h @@ -56,7 +56,7 @@ class LocalTrajectoryUploaderInterface { // Returns LocalTrajectoryUploader with the actual implementation. std::unique_ptr CreateLocalTrajectoryUploader( const std::string& uplink_server_address, int batch_size, - bool enable_ssl_encryption); + bool enable_ssl_encryption, const std::string& token_file_path); } // namespace cloud } // namespace cartographer diff --git a/cartographer/cloud/internal/map_builder_server.cc b/cartographer/cloud/internal/map_builder_server.cc index 427489f..d298be3 100644 --- a/cartographer/cloud/internal/map_builder_server.cc +++ b/cartographer/cloud/internal/map_builder_server.cc @@ -67,7 +67,8 @@ MapBuilderServer::MapBuilderServer( local_trajectory_uploader_ = CreateLocalTrajectoryUploader( map_builder_server_options.uplink_server_address(), map_builder_server_options.upload_batch_size(), - map_builder_server_options.enable_ssl_encryption()); + map_builder_server_options.enable_ssl_encryption(), + map_builder_server_options.token_file_path()); } server_builder.RegisterHandler(); server_builder.RegisterHandler(); diff --git a/cartographer/cloud/map_builder_server_options.cc b/cartographer/cloud/map_builder_server_options.cc index f499a71..563d4d1 100644 --- a/cartographer/cloud/map_builder_server_options.cc +++ b/cartographer/cloud/map_builder_server_options.cc @@ -41,6 +41,8 @@ proto::MapBuilderServerOptions CreateMapBuilderServerOptions( lua_parameter_dictionary->GetInt("upload_batch_size")); map_builder_server_options.set_enable_ssl_encryption( lua_parameter_dictionary->GetBool("enable_ssl_encryption")); + map_builder_server_options.set_token_file_path( + lua_parameter_dictionary->GetString("token_file_path")); return map_builder_server_options; } diff --git a/cartographer/cloud/proto/map_builder_server_options.proto b/cartographer/cloud/proto/map_builder_server_options.proto index 92e5217..7f53ba9 100644 --- a/cartographer/cloud/proto/map_builder_server_options.proto +++ b/cartographer/cloud/proto/map_builder_server_options.proto @@ -26,4 +26,5 @@ message MapBuilderServerOptions { string uplink_server_address = 5; int32 upload_batch_size = 6; bool enable_ssl_encryption = 7; + string token_file_path = 8; } diff --git a/configuration_files/map_builder_server.lua b/configuration_files/map_builder_server.lua index e04b220..384fe79 100644 --- a/configuration_files/map_builder_server.lua +++ b/configuration_files/map_builder_server.lua @@ -22,4 +22,5 @@ MAP_BUILDER_SERVER = { uplink_server_address = "", upload_batch_size = 100, enable_ssl_encryption = false, + token_file_path = "", } diff --git a/scripts/install_async_grpc.sh b/scripts/install_async_grpc.sh index 09c79b0..4c09ce3 100755 --- a/scripts/install_async_grpc.sh +++ b/scripts/install_async_grpc.sh @@ -19,7 +19,7 @@ set -o verbose git clone https://github.com/googlecartographer/async_grpc cd async_grpc -git checkout c2c68f56904a595ab5ba24c1fb19b4b8e954fa15 +git checkout a562634ecea49beac5a08436ca76c6b82efb439b mkdir build cd build cmake -G Ninja \