From 35a9c3d63bdfa87cab6df283793e3de891ab73e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Wed, 17 Jan 2018 12:01:29 +0100 Subject: [PATCH] Implement sensor data uploading in LocalTrajectoryUploader. (#822) --- cartographer_grpc/framework/client_writer.h | 38 +++++++ .../local_trajectory_uploader.cc | 107 ++++++++++++++++-- cartographer_grpc/local_trajectory_uploader.h | 27 ++++- cartographer_grpc/map_builder_server.cc | 6 + cartographer_grpc/map_builder_server.h | 5 +- .../mapping/trajectory_builder_stub.h | 16 +-- 6 files changed, 179 insertions(+), 20 deletions(-) create mode 100644 cartographer_grpc/framework/client_writer.h diff --git a/cartographer_grpc/framework/client_writer.h b/cartographer_grpc/framework/client_writer.h new file mode 100644 index 0000000..bc4a10c --- /dev/null +++ b/cartographer_grpc/framework/client_writer.h @@ -0,0 +1,38 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ +#define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ + +#include + +#include "google/protobuf/empty.pb.h" +#include "grpc++/grpc++.h" + +namespace cartographer_grpc { +namespace framework { + +template +struct ClientWriter { + grpc::ClientContext client_context; + std::unique_ptr> client_writer; + google::protobuf::Empty response; +}; + +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_ diff --git a/cartographer_grpc/local_trajectory_uploader.cc b/cartographer_grpc/local_trajectory_uploader.cc index f3e2746..66ba0d0 100644 --- a/cartographer_grpc/local_trajectory_uploader.cc +++ b/cartographer_grpc/local_trajectory_uploader.cc @@ -16,26 +16,119 @@ #include "cartographer_grpc/local_trajectory_uploader.h" +#include "cartographer/common/make_unique.h" +#include "cartographer_grpc/proto/map_builder_service.pb.h" #include "glog/logging.h" namespace cartographer_grpc { +namespace { + +const cartographer::common::Duration kPopTimeout = + cartographer::common::FromMilliseconds(100); + +} // namespace LocalTrajectoryUploader::LocalTrajectoryUploader( - const std::string& server_address) - : client_channel_(grpc::CreateChannel(server_address, + const std::string &uplink_server_address) + : client_channel_(grpc::CreateChannel(uplink_server_address, grpc::InsecureChannelCredentials())), service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {} +LocalTrajectoryUploader::~LocalTrajectoryUploader() { + if (imu_writer_.client_writer) { + CHECK(imu_writer_.client_writer->WritesDone()); + CHECK(imu_writer_.client_writer->Finish().ok()); + } + if (odometry_writer_.client_writer) { + CHECK(odometry_writer_.client_writer->WritesDone()); + CHECK(odometry_writer_.client_writer->Finish().ok()); + } + if (fixed_frame_pose_writer_.client_writer) { + CHECK(fixed_frame_pose_writer_.client_writer->WritesDone()); + CHECK(fixed_frame_pose_writer_.client_writer->Finish().ok()); + } +} + +void LocalTrajectoryUploader::Start() { + CHECK(!shutting_down_); + CHECK(!upload_thread_); + upload_thread_ = cartographer::common::make_unique( + [this]() { this->ProcessSendQueue(); }); +} + +void LocalTrajectoryUploader::Shutdown() { + CHECK(!shutting_down_); + CHECK(upload_thread_); + shutting_down_ = true; + upload_thread_->join(); +} + +void LocalTrajectoryUploader::ProcessSendQueue() { + LOG(INFO) << "Starting uploader thread."; + while (!shutting_down_) { + auto data_message = send_queue_.PopWithTimeout(kPopTimeout); + if (data_message) { + if (const auto *fixed_frame_pose_data = + dynamic_cast( + data_message.get())) { + ProcessFixedFramePoseDataMessage(fixed_frame_pose_data); + } else if (const auto *imu_data = + dynamic_cast( + data_message.get())) { + ProcessImuDataMessage(imu_data); + } else if (const auto *odometry_data = + dynamic_cast( + data_message.get())) { + ProcessOdometryDataMessage(odometry_data); + } else { + LOG(FATAL) << "Unknown message type: " << data_message->GetTypeName(); + } + } + } +} + +void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage( + const proto::AddFixedFramePoseDataRequest *data_request) { + if (!fixed_frame_pose_writer_.client_writer) { + fixed_frame_pose_writer_.client_writer = + service_stub_->AddFixedFramePoseData( + &fixed_frame_pose_writer_.client_context, + &fixed_frame_pose_writer_.response); + CHECK(fixed_frame_pose_writer_.client_writer); + } + fixed_frame_pose_writer_.client_writer->Write(*data_request); +} + +void LocalTrajectoryUploader::ProcessImuDataMessage( + const proto::AddImuDataRequest *data_request) { + if (!imu_writer_.client_writer) { + imu_writer_.client_writer = service_stub_->AddImuData( + &imu_writer_.client_context, &imu_writer_.response); + CHECK(imu_writer_.client_writer); + } + imu_writer_.client_writer->Write(*data_request); +} + +void LocalTrajectoryUploader::ProcessOdometryDataMessage( + const proto::AddOdometryDataRequest *data_request) { + if (!odometry_writer_.client_writer) { + odometry_writer_.client_writer = service_stub_->AddOdometryData( + &odometry_writer_.client_context, &odometry_writer_.response); + CHECK(odometry_writer_.client_writer); + } + odometry_writer_.client_writer->Write(*data_request); +} + void LocalTrajectoryUploader::AddTrajectory( int local_trajectory_id, - const std::unordered_set& expected_sensor_ids, - const cartographer::mapping::proto::TrajectoryBuilderOptions& - trajectory_options) { + const std::unordered_set &expected_sensor_ids, + const cartographer::mapping::proto::TrajectoryBuilderOptions + &trajectory_options) { grpc::ClientContext client_context; proto::AddTrajectoryRequest request; proto::AddTrajectoryResponse result; *request.mutable_trajectory_builder_options() = trajectory_options; - for (const auto& sensor_id : expected_sensor_ids) { + for (const auto &sensor_id : expected_sensor_ids) { *request.add_expected_sensor_ids() = sensor_id; } grpc::Status status = @@ -64,4 +157,4 @@ void LocalTrajectoryUploader::EnqueueDataRequest( send_queue_.Push(std::move(data_request)); } -} // namespace cartographer_grpc \ No newline at end of file +} // namespace cartographer_grpc diff --git a/cartographer_grpc/local_trajectory_uploader.h b/cartographer_grpc/local_trajectory_uploader.h index cb1623b..0b7e8bc 100644 --- a/cartographer_grpc/local_trajectory_uploader.h +++ b/cartographer_grpc/local_trajectory_uploader.h @@ -18,11 +18,14 @@ #define CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H #include +#include #include +#include #include #include "cartographer/common/blocking_queue.h" #include "cartographer/mapping/proto/trajectory_builder_options.pb.h" +#include "cartographer_grpc/framework/client_writer.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "grpc++/grpc++.h" @@ -30,7 +33,16 @@ namespace cartographer_grpc { class LocalTrajectoryUploader { public: - LocalTrajectoryUploader(const std::string& server_address); + LocalTrajectoryUploader(const std::string& uplink_server_address); + ~LocalTrajectoryUploader(); + + // Starts the upload thread. + void Start(); + + // Shuts down the upload thread. This method blocks until the shutdown is + // complete. + void Shutdown(); + void AddTrajectory( int local_trajectory_id, const std::unordered_set& expected_sensor_ids, @@ -41,12 +53,25 @@ class LocalTrajectoryUploader { std::unique_ptr data_request); private: + void ProcessSendQueue(); + void ProcessFixedFramePoseDataMessage( + const proto::AddFixedFramePoseDataRequest* data_request); + void ProcessImuDataMessage(const proto::AddImuDataRequest* data_request); + void ProcessOdometryDataMessage( + const proto::AddOdometryDataRequest* data_request); + std::shared_ptr client_channel_; std::unique_ptr service_stub_; std::map local_to_cloud_trajectory_id_map_; cartographer::common::BlockingQueue< std::unique_ptr> send_queue_; + bool shutting_down_ = false; + std::unique_ptr upload_thread_; + framework::ClientWriter + fixed_frame_pose_writer_; + framework::ClientWriter imu_writer_; + framework::ClientWriter odometry_writer_; }; } // namespace cartographer_grpc diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index 964befb..47161b4 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -253,6 +253,9 @@ MapBuilderServer::MapBuilderServer( void MapBuilderServer::Start() { shutting_down_ = false; + if (local_trajectory_uploader_) { + local_trajectory_uploader_->Start(); + } StartSlamThread(); grpc_server_->Start(); } @@ -262,6 +265,9 @@ void MapBuilderServer::WaitForShutdown() { if (slam_thread_) { slam_thread_->join(); } + if (local_trajectory_uploader_) { + local_trajectory_uploader_->Shutdown(); + } } void MapBuilderServer::Shutdown() { diff --git a/cartographer_grpc/map_builder_server.h b/cartographer_grpc/map_builder_server.h index 2fc85da..ce96d90 100644 --- a/cartographer_grpc/map_builder_server.h +++ b/cartographer_grpc/map_builder_server.h @@ -112,7 +112,7 @@ class MapBuilderServer { const proto::MapBuilderServerOptions& map_builder_server_options, std::unique_ptr map_builder); - // Starts the gRPC server and the SLAM thread. + // Starts the gRPC server, the 'LocalTrajectoryUploader' and the SLAM thread. void Start(); // Waits for the 'MapBuilderServer' to shut down. Note: The server must be @@ -123,7 +123,8 @@ class MapBuilderServer { // Waits until all computation is finished (for testing). void WaitUntilIdle(); - // Shuts down the gRPC server and the SLAM thread. + // Shuts down the gRPC server, the 'LocalTrajectoryUploader' and the SLAM + // thread. void Shutdown(); private: diff --git a/cartographer_grpc/mapping/trajectory_builder_stub.h b/cartographer_grpc/mapping/trajectory_builder_stub.h index a6f0c63..ef816b4 100644 --- a/cartographer_grpc/mapping/trajectory_builder_stub.h +++ b/cartographer_grpc/mapping/trajectory_builder_stub.h @@ -21,6 +21,7 @@ #include "cartographer/mapping/local_slam_result_data.h" #include "cartographer/mapping/trajectory_builder_interface.h" +#include "cartographer_grpc/framework/client_writer.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "grpc++/grpc++.h" @@ -53,12 +54,6 @@ class TrajectoryBuilderStub local_slam_result_data) override; private: - template - struct SensorClientWriter { - grpc::ClientContext client_context; - std::unique_ptr> client_writer; - google::protobuf::Empty response; - }; struct LocalSlamResultReader { grpc::ClientContext client_context; std::unique_ptr> @@ -73,10 +68,11 @@ class TrajectoryBuilderStub std::shared_ptr client_channel_; const int trajectory_id_; std::unique_ptr stub_; - SensorClientWriter rangefinder_writer_; - SensorClientWriter imu_writer_; - SensorClientWriter odometry_writer_; - SensorClientWriter fixed_frame_writer_; + framework::ClientWriter rangefinder_writer_; + framework::ClientWriter imu_writer_; + framework::ClientWriter odometry_writer_; + framework::ClientWriter + fixed_frame_writer_; LocalSlamResultReader local_slam_result_reader_; };