Implement sensor data uploading in LocalTrajectoryUploader. (#822)

master
Christoph Schütte 2018-01-17 12:01:29 +01:00 committed by Wally B. Feed
parent f64eef876a
commit 35a9c3d63b
6 changed files with 179 additions and 20 deletions

View File

@ -0,0 +1,38 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_
#define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_
#include <memory>
#include "google/protobuf/empty.pb.h"
#include "grpc++/grpc++.h"
namespace cartographer_grpc {
namespace framework {
template <typename RequestType>
struct ClientWriter {
grpc::ClientContext client_context;
std::unique_ptr<grpc::ClientWriter<RequestType>> client_writer;
google::protobuf::Empty response;
};
} // namespace framework
} // namespace cartographer_grpc
#endif // CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_WRITER_H_

View File

@ -16,21 +16,114 @@
#include "cartographer_grpc/local_trajectory_uploader.h" #include "cartographer_grpc/local_trajectory_uploader.h"
#include "cartographer/common/make_unique.h"
#include "cartographer_grpc/proto/map_builder_service.pb.h"
#include "glog/logging.h" #include "glog/logging.h"
namespace cartographer_grpc { namespace cartographer_grpc {
namespace {
const cartographer::common::Duration kPopTimeout =
cartographer::common::FromMilliseconds(100);
} // namespace
LocalTrajectoryUploader::LocalTrajectoryUploader( LocalTrajectoryUploader::LocalTrajectoryUploader(
const std::string& server_address) const std::string &uplink_server_address)
: client_channel_(grpc::CreateChannel(server_address, : client_channel_(grpc::CreateChannel(uplink_server_address,
grpc::InsecureChannelCredentials())), grpc::InsecureChannelCredentials())),
service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {} service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {}
LocalTrajectoryUploader::~LocalTrajectoryUploader() {
if (imu_writer_.client_writer) {
CHECK(imu_writer_.client_writer->WritesDone());
CHECK(imu_writer_.client_writer->Finish().ok());
}
if (odometry_writer_.client_writer) {
CHECK(odometry_writer_.client_writer->WritesDone());
CHECK(odometry_writer_.client_writer->Finish().ok());
}
if (fixed_frame_pose_writer_.client_writer) {
CHECK(fixed_frame_pose_writer_.client_writer->WritesDone());
CHECK(fixed_frame_pose_writer_.client_writer->Finish().ok());
}
}
void LocalTrajectoryUploader::Start() {
CHECK(!shutting_down_);
CHECK(!upload_thread_);
upload_thread_ = cartographer::common::make_unique<std::thread>(
[this]() { this->ProcessSendQueue(); });
}
void LocalTrajectoryUploader::Shutdown() {
CHECK(!shutting_down_);
CHECK(upload_thread_);
shutting_down_ = true;
upload_thread_->join();
}
void LocalTrajectoryUploader::ProcessSendQueue() {
LOG(INFO) << "Starting uploader thread.";
while (!shutting_down_) {
auto data_message = send_queue_.PopWithTimeout(kPopTimeout);
if (data_message) {
if (const auto *fixed_frame_pose_data =
dynamic_cast<const proto::AddFixedFramePoseDataRequest *>(
data_message.get())) {
ProcessFixedFramePoseDataMessage(fixed_frame_pose_data);
} else if (const auto *imu_data =
dynamic_cast<const proto::AddImuDataRequest *>(
data_message.get())) {
ProcessImuDataMessage(imu_data);
} else if (const auto *odometry_data =
dynamic_cast<const proto::AddOdometryDataRequest *>(
data_message.get())) {
ProcessOdometryDataMessage(odometry_data);
} else {
LOG(FATAL) << "Unknown message type: " << data_message->GetTypeName();
}
}
}
}
void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage(
const proto::AddFixedFramePoseDataRequest *data_request) {
if (!fixed_frame_pose_writer_.client_writer) {
fixed_frame_pose_writer_.client_writer =
service_stub_->AddFixedFramePoseData(
&fixed_frame_pose_writer_.client_context,
&fixed_frame_pose_writer_.response);
CHECK(fixed_frame_pose_writer_.client_writer);
}
fixed_frame_pose_writer_.client_writer->Write(*data_request);
}
void LocalTrajectoryUploader::ProcessImuDataMessage(
const proto::AddImuDataRequest *data_request) {
if (!imu_writer_.client_writer) {
imu_writer_.client_writer = service_stub_->AddImuData(
&imu_writer_.client_context, &imu_writer_.response);
CHECK(imu_writer_.client_writer);
}
imu_writer_.client_writer->Write(*data_request);
}
void LocalTrajectoryUploader::ProcessOdometryDataMessage(
const proto::AddOdometryDataRequest *data_request) {
if (!odometry_writer_.client_writer) {
odometry_writer_.client_writer = service_stub_->AddOdometryData(
&odometry_writer_.client_context, &odometry_writer_.response);
CHECK(odometry_writer_.client_writer);
}
odometry_writer_.client_writer->Write(*data_request);
}
void LocalTrajectoryUploader::AddTrajectory( void LocalTrajectoryUploader::AddTrajectory(
int local_trajectory_id, int local_trajectory_id,
const std::unordered_set<std::string> &expected_sensor_ids, const std::unordered_set<std::string> &expected_sensor_ids,
const cartographer::mapping::proto::TrajectoryBuilderOptions& const cartographer::mapping::proto::TrajectoryBuilderOptions
trajectory_options) { &trajectory_options) {
grpc::ClientContext client_context; grpc::ClientContext client_context;
proto::AddTrajectoryRequest request; proto::AddTrajectoryRequest request;
proto::AddTrajectoryResponse result; proto::AddTrajectoryResponse result;

View File

@ -18,11 +18,14 @@
#define CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H #define CARTOGRAPHER_GRPC_LOCAL_TRAJECTORY_UPLOADER_H
#include <map> #include <map>
#include <memory>
#include <string> #include <string>
#include <thread>
#include <unordered_set> #include <unordered_set>
#include "cartographer/common/blocking_queue.h" #include "cartographer/common/blocking_queue.h"
#include "cartographer/mapping/proto/trajectory_builder_options.pb.h" #include "cartographer/mapping/proto/trajectory_builder_options.pb.h"
#include "cartographer_grpc/framework/client_writer.h"
#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h"
#include "grpc++/grpc++.h" #include "grpc++/grpc++.h"
@ -30,7 +33,16 @@ namespace cartographer_grpc {
class LocalTrajectoryUploader { class LocalTrajectoryUploader {
public: public:
LocalTrajectoryUploader(const std::string& server_address); LocalTrajectoryUploader(const std::string& uplink_server_address);
~LocalTrajectoryUploader();
// Starts the upload thread.
void Start();
// Shuts down the upload thread. This method blocks until the shutdown is
// complete.
void Shutdown();
void AddTrajectory( void AddTrajectory(
int local_trajectory_id, int local_trajectory_id,
const std::unordered_set<std::string>& expected_sensor_ids, const std::unordered_set<std::string>& expected_sensor_ids,
@ -41,12 +53,25 @@ class LocalTrajectoryUploader {
std::unique_ptr<google::protobuf::Message> data_request); std::unique_ptr<google::protobuf::Message> data_request);
private: private:
void ProcessSendQueue();
void ProcessFixedFramePoseDataMessage(
const proto::AddFixedFramePoseDataRequest* data_request);
void ProcessImuDataMessage(const proto::AddImuDataRequest* data_request);
void ProcessOdometryDataMessage(
const proto::AddOdometryDataRequest* data_request);
std::shared_ptr<grpc::Channel> client_channel_; std::shared_ptr<grpc::Channel> client_channel_;
std::unique_ptr<proto::MapBuilderService::Stub> service_stub_; std::unique_ptr<proto::MapBuilderService::Stub> service_stub_;
std::map<int, int> local_to_cloud_trajectory_id_map_; std::map<int, int> local_to_cloud_trajectory_id_map_;
cartographer::common::BlockingQueue< cartographer::common::BlockingQueue<
std::unique_ptr<google::protobuf::Message>> std::unique_ptr<google::protobuf::Message>>
send_queue_; send_queue_;
bool shutting_down_ = false;
std::unique_ptr<std::thread> upload_thread_;
framework::ClientWriter<proto::AddFixedFramePoseDataRequest>
fixed_frame_pose_writer_;
framework::ClientWriter<proto::AddImuDataRequest> imu_writer_;
framework::ClientWriter<proto::AddOdometryDataRequest> odometry_writer_;
}; };
} // namespace cartographer_grpc } // namespace cartographer_grpc

View File

@ -253,6 +253,9 @@ MapBuilderServer::MapBuilderServer(
void MapBuilderServer::Start() { void MapBuilderServer::Start() {
shutting_down_ = false; shutting_down_ = false;
if (local_trajectory_uploader_) {
local_trajectory_uploader_->Start();
}
StartSlamThread(); StartSlamThread();
grpc_server_->Start(); grpc_server_->Start();
} }
@ -262,6 +265,9 @@ void MapBuilderServer::WaitForShutdown() {
if (slam_thread_) { if (slam_thread_) {
slam_thread_->join(); slam_thread_->join();
} }
if (local_trajectory_uploader_) {
local_trajectory_uploader_->Shutdown();
}
} }
void MapBuilderServer::Shutdown() { void MapBuilderServer::Shutdown() {

View File

@ -112,7 +112,7 @@ class MapBuilderServer {
const proto::MapBuilderServerOptions& map_builder_server_options, const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<cartographer::mapping::MapBuilderInterface> map_builder); std::unique_ptr<cartographer::mapping::MapBuilderInterface> map_builder);
// Starts the gRPC server and the SLAM thread. // Starts the gRPC server, the 'LocalTrajectoryUploader' and the SLAM thread.
void Start(); void Start();
// Waits for the 'MapBuilderServer' to shut down. Note: The server must be // Waits for the 'MapBuilderServer' to shut down. Note: The server must be
@ -123,7 +123,8 @@ class MapBuilderServer {
// Waits until all computation is finished (for testing). // Waits until all computation is finished (for testing).
void WaitUntilIdle(); void WaitUntilIdle();
// Shuts down the gRPC server and the SLAM thread. // Shuts down the gRPC server, the 'LocalTrajectoryUploader' and the SLAM
// thread.
void Shutdown(); void Shutdown();
private: private:

View File

@ -21,6 +21,7 @@
#include "cartographer/mapping/local_slam_result_data.h" #include "cartographer/mapping/local_slam_result_data.h"
#include "cartographer/mapping/trajectory_builder_interface.h" #include "cartographer/mapping/trajectory_builder_interface.h"
#include "cartographer_grpc/framework/client_writer.h"
#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h" #include "cartographer_grpc/proto/map_builder_service.grpc.pb.h"
#include "grpc++/grpc++.h" #include "grpc++/grpc++.h"
@ -53,12 +54,6 @@ class TrajectoryBuilderStub
local_slam_result_data) override; local_slam_result_data) override;
private: private:
template <typename RequestType>
struct SensorClientWriter {
grpc::ClientContext client_context;
std::unique_ptr<grpc::ClientWriter<RequestType>> client_writer;
google::protobuf::Empty response;
};
struct LocalSlamResultReader { struct LocalSlamResultReader {
grpc::ClientContext client_context; grpc::ClientContext client_context;
std::unique_ptr<grpc::ClientReader<proto::ReceiveLocalSlamResultsResponse>> std::unique_ptr<grpc::ClientReader<proto::ReceiveLocalSlamResultsResponse>>
@ -73,10 +68,11 @@ class TrajectoryBuilderStub
std::shared_ptr<grpc::Channel> client_channel_; std::shared_ptr<grpc::Channel> client_channel_;
const int trajectory_id_; const int trajectory_id_;
std::unique_ptr<proto::MapBuilderService::Stub> stub_; std::unique_ptr<proto::MapBuilderService::Stub> stub_;
SensorClientWriter<proto::AddRangefinderDataRequest> rangefinder_writer_; framework::ClientWriter<proto::AddRangefinderDataRequest> rangefinder_writer_;
SensorClientWriter<proto::AddImuDataRequest> imu_writer_; framework::ClientWriter<proto::AddImuDataRequest> imu_writer_;
SensorClientWriter<proto::AddOdometryDataRequest> odometry_writer_; framework::ClientWriter<proto::AddOdometryDataRequest> odometry_writer_;
SensorClientWriter<proto::AddFixedFramePoseDataRequest> fixed_frame_writer_; framework::ClientWriter<proto::AddFixedFramePoseDataRequest>
fixed_frame_writer_;
LocalSlamResultReader local_slam_result_reader_; LocalSlamResultReader local_slam_result_reader_;
}; };